mirror of https://github.com/apache/druid.git
Added puller.
This commit is contained in:
parent
142668cda5
commit
8e8736291f
|
@ -0,0 +1,2 @@
|
||||||
|
CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE;
|
||||||
|
CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE;
|
|
@ -141,17 +141,9 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
|
||||||
private void initializeSegmentLoader()
|
private void initializeSegmentLoader()
|
||||||
{
|
{
|
||||||
if (segmentLoader == null) {
|
if (segmentLoader == null) {
|
||||||
final Properties props = getProps();
|
|
||||||
try {
|
try {
|
||||||
final RestS3Service s3Client = new RestS3Service(
|
|
||||||
new AWSCredentials(
|
|
||||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
|
||||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
setSegmentLoader(
|
setSegmentLoader(
|
||||||
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class))
|
ServerInit.makeDefaultQueryableLoader(getConfigFactory(), getProps())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (S3ServiceException e) {
|
catch (S3ServiceException e) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.initialization;
|
||||||
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -41,9 +42,6 @@ import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.druid.DruidProcessingConfig;
|
import com.metamx.druid.DruidProcessingConfig;
|
||||||
import com.metamx.druid.Query;
|
import com.metamx.druid.Query;
|
||||||
import com.metamx.druid.collect.StupidPool;
|
import com.metamx.druid.collect.StupidPool;
|
||||||
import com.metamx.druid.loading.CassandraDataSegmentPuller;
|
|
||||||
import com.metamx.druid.loading.CassandraDataSegmentPusher;
|
|
||||||
import com.metamx.druid.loading.CassandraDataSegmentPusherConfig;
|
|
||||||
import com.metamx.druid.loading.DataSegmentPusher;
|
import com.metamx.druid.loading.DataSegmentPusher;
|
||||||
import com.metamx.druid.loading.DelegatingSegmentLoader;
|
import com.metamx.druid.loading.DelegatingSegmentLoader;
|
||||||
import com.metamx.druid.loading.HdfsDataSegmentPuller;
|
import com.metamx.druid.loading.HdfsDataSegmentPuller;
|
||||||
|
@ -60,6 +58,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig;
|
||||||
import com.metamx.druid.loading.SegmentLoader;
|
import com.metamx.druid.loading.SegmentLoader;
|
||||||
import com.metamx.druid.loading.SegmentLoaderConfig;
|
import com.metamx.druid.loading.SegmentLoaderConfig;
|
||||||
import com.metamx.druid.loading.SingleSegmentLoader;
|
import com.metamx.druid.loading.SingleSegmentLoader;
|
||||||
|
import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig;
|
||||||
|
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller;
|
||||||
|
import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher;
|
||||||
import com.metamx.druid.query.QueryRunnerFactory;
|
import com.metamx.druid.query.QueryRunnerFactory;
|
||||||
import com.metamx.druid.query.group.GroupByQuery;
|
import com.metamx.druid.query.group.GroupByQuery;
|
||||||
import com.metamx.druid.query.group.GroupByQueryEngine;
|
import com.metamx.druid.query.group.GroupByQueryEngine;
|
||||||
|
@ -83,27 +84,42 @@ public class ServerInit
|
||||||
private static Logger log = new Logger(ServerInit.class);
|
private static Logger log = new Logger(ServerInit.class);
|
||||||
|
|
||||||
public static SegmentLoader makeDefaultQueryableLoader(
|
public static SegmentLoader makeDefaultQueryableLoader(
|
||||||
RestS3Service s3Client,
|
final ConfigurationObjectFactory configFactory,
|
||||||
SegmentLoaderConfig config
|
final Properties props
|
||||||
)
|
) throws S3ServiceException
|
||||||
{
|
{
|
||||||
|
SegmentLoaderConfig config = configFactory.build(SegmentLoaderConfig.class);
|
||||||
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
|
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
|
||||||
|
|
||||||
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
|
|
||||||
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
|
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
|
||||||
|
|
||||||
SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
|
SingleSegmentLoader s3segmentLoader= null;
|
||||||
|
try {
|
||||||
|
final RestS3Service s3Client = new RestS3Service(
|
||||||
|
new AWSCredentials(
|
||||||
|
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||||
|
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
|
||||||
|
s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
|
||||||
|
} catch (com.metamx.common.ISE ise){
|
||||||
|
log.warn("Could not create s3Client.", ise);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, SegmentLoader> loaderTypes = new HashMap<String, SegmentLoader>();
|
||||||
|
loaderTypes.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config));
|
||||||
|
loaderTypes.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config));
|
||||||
|
loaderTypes.put("c*",
|
||||||
|
new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config));
|
||||||
|
|
||||||
|
if (s3segmentLoader != null){
|
||||||
|
loaderTypes.put("s3", s3segmentLoader);
|
||||||
|
loaderTypes.put("s3_zip", s3segmentLoader);
|
||||||
|
}
|
||||||
|
|
||||||
delegateLoader.setLoaderTypes(
|
delegateLoader.setLoaderTypes(
|
||||||
ImmutableMap.<String, SegmentLoader>builder()
|
ImmutableMap.<String, SegmentLoader>copyOf(loaderTypes)
|
||||||
.put("s3", s3segmentLoader)
|
|
||||||
.put("s3_zip", s3segmentLoader)
|
|
||||||
.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config))
|
|
||||||
.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config))
|
|
||||||
.put("cassandra", new SingleSegmentLoader(new CassandraDataSegmentPuller(new Configuration()), factory, config))
|
|
||||||
.build()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return delegateLoader;
|
return delegateLoader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +193,7 @@ public class ServerInit
|
||||||
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
|
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
|
||||||
}
|
}
|
||||||
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) {
|
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) {
|
||||||
final CassandraDataSegmentPusherConfig config = configFactory.build(CassandraDataSegmentPusherConfig.class);
|
final CassandraDataSegmentConfig config = configFactory.build(CassandraDataSegmentConfig.class);
|
||||||
|
|
||||||
return new CassandraDataSegmentPusher(config, jsonMapper);
|
return new CassandraDataSegmentPusher(config, jsonMapper);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,69 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.metamx.druid.loading;
|
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.metamx.common.MapUtils;
|
|
||||||
import com.metamx.common.logger.Logger;
|
|
||||||
import com.metamx.druid.client.DataSegment;
|
|
||||||
import org.jets3t.service.ServiceException;
|
|
||||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class CassandraDataSegmentKiller implements DataSegmentKiller
|
|
||||||
{
|
|
||||||
private static final Logger log = new Logger(CassandraDataSegmentKiller.class);
|
|
||||||
|
|
||||||
private final RestS3Service s3Client;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public CassandraDataSegmentKiller(
|
|
||||||
RestS3Service s3Client
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.s3Client = s3Client;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void kill(DataSegment segment) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
Map<String, Object> loadSpec = segment.getLoadSpec();
|
|
||||||
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
|
|
||||||
String s3Path = MapUtils.getString(loadSpec, "key");
|
|
||||||
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
|
|
||||||
|
|
||||||
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
|
|
||||||
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
|
|
||||||
s3Client.deleteObject(s3Bucket, s3Path);
|
|
||||||
}
|
|
||||||
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
|
|
||||||
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
|
|
||||||
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (ServiceException e) {
|
|
||||||
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,170 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.metamx.druid.loading;
|
|
||||||
|
|
||||||
import com.google.common.io.ByteStreams;
|
|
||||||
import com.google.common.io.Closeables;
|
|
||||||
import com.google.common.io.Files;
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.metamx.common.ISE;
|
|
||||||
import com.metamx.common.MapUtils;
|
|
||||||
import com.metamx.common.logger.Logger;
|
|
||||||
import com.metamx.druid.client.DataSegment;
|
|
||||||
import com.metamx.druid.common.s3.S3Utils;
|
|
||||||
import com.metamx.druid.utils.CompressionUtils;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.jets3t.service.S3ServiceException;
|
|
||||||
import org.jets3t.service.ServiceException;
|
|
||||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
|
||||||
import org.jets3t.service.model.S3Bucket;
|
|
||||||
import org.jets3t.service.model.S3Object;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.zip.GZIPInputStream;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class CassandraDataSegmentPuller implements DataSegmentPuller
|
|
||||||
{
|
|
||||||
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
|
|
||||||
|
|
||||||
private static final String BUCKET = "bucket";
|
|
||||||
private static final String KEY = "key";
|
|
||||||
|
|
||||||
private final RestS3Service s3Client;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public CassandraDataSegmentPuller(
|
|
||||||
RestS3Service s3Client
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.s3Client = s3Client;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
S3Coords s3Coords = new S3Coords(segment);
|
|
||||||
|
|
||||||
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
|
|
||||||
|
|
||||||
if (!isObjectInBucket(s3Coords)) {
|
|
||||||
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!outDir.exists()) {
|
|
||||||
outDir.mkdirs();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!outDir.isDirectory()) {
|
|
||||||
throw new ISE("outDir[%s] must be a directory.", outDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
|
||||||
S3Object s3Obj = null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
|
|
||||||
|
|
||||||
InputStream in = null;
|
|
||||||
try {
|
|
||||||
in = s3Obj.getDataInputStream();
|
|
||||||
final String key = s3Obj.getKey();
|
|
||||||
if (key.endsWith(".zip")) {
|
|
||||||
CompressionUtils.unzip(in, outDir);
|
|
||||||
} else if (key.endsWith(".gz")) {
|
|
||||||
final File outFile = new File(outDir, toFilename(key, ".gz"));
|
|
||||||
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
|
|
||||||
} else {
|
|
||||||
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
|
|
||||||
}
|
|
||||||
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
FileUtils.deleteDirectory(outDir);
|
|
||||||
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
Closeables.closeQuietly(in);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new SegmentLoadingException(e, e.getMessage());
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
S3Utils.closeStreamsQuietly(s3Obj);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private String toFilename(String key, final String suffix)
|
|
||||||
{
|
|
||||||
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
|
|
||||||
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
|
|
||||||
return filename;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return s3Client.isObjectInBucket(coords.bucket, coords.path);
|
|
||||||
}
|
|
||||||
catch (ServiceException e) {
|
|
||||||
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getLastModified(DataSegment segment) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
S3Coords coords = new S3Coords(segment);
|
|
||||||
try {
|
|
||||||
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
|
|
||||||
return objDetails.getLastModifiedDate().getTime();
|
|
||||||
}
|
|
||||||
catch (S3ServiceException e) {
|
|
||||||
throw new SegmentLoadingException(e, e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class S3Coords
|
|
||||||
{
|
|
||||||
String bucket;
|
|
||||||
String path;
|
|
||||||
|
|
||||||
public S3Coords(DataSegment segment)
|
|
||||||
{
|
|
||||||
Map<String, Object> loadSpec = segment.getLoadSpec();
|
|
||||||
bucket = MapUtils.getString(loadSpec, BUCKET);
|
|
||||||
path = MapUtils.getString(loadSpec, KEY);
|
|
||||||
if (path.startsWith("/")) {
|
|
||||||
path = path.substring(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString()
|
|
||||||
{
|
|
||||||
return String.format("s3://%s/%s", bucket, path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -17,13 +17,13 @@
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.loading;
|
package com.metamx.druid.loading.cassandra;
|
||||||
|
|
||||||
import org.skife.config.Config;
|
import org.skife.config.Config;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public abstract class CassandraDataSegmentPusherConfig
|
public abstract class CassandraDataSegmentConfig
|
||||||
{
|
{
|
||||||
@Config("druid.pusher.cassandra.host")
|
@Config("druid.pusher.cassandra.host")
|
||||||
public abstract String getHost();
|
public abstract String getHost();
|
|
@ -0,0 +1,155 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.loading.cassandra;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import com.metamx.druid.loading.DataSegmentPuller;
|
||||||
|
import com.metamx.druid.loading.SegmentLoadingException;
|
||||||
|
import com.netflix.astyanax.AstyanaxContext;
|
||||||
|
import com.netflix.astyanax.Keyspace;
|
||||||
|
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
|
||||||
|
import com.netflix.astyanax.connectionpool.OperationResult;
|
||||||
|
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
|
||||||
|
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
|
||||||
|
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
|
||||||
|
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
|
||||||
|
import com.netflix.astyanax.model.ColumnFamily;
|
||||||
|
import com.netflix.astyanax.model.ColumnList;
|
||||||
|
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
|
||||||
|
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
|
||||||
|
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
|
||||||
|
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
|
||||||
|
import com.netflix.astyanax.serializers.StringSerializer;
|
||||||
|
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class CassandraDataSegmentPuller implements DataSegmentPuller
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
|
||||||
|
private static final String CLUSTER_NAME = "druid_cassandra_cluster";
|
||||||
|
private static final String INDEX_TABLE_NAME = "index_storage";
|
||||||
|
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
|
||||||
|
private static final int CONCURRENCY = 10;
|
||||||
|
private static final int BATCH_SIZE = 10;
|
||||||
|
|
||||||
|
private Keyspace keyspace;
|
||||||
|
private AstyanaxContext<Keyspace> astyanaxContext;
|
||||||
|
private ChunkedStorageProvider indexStorage;
|
||||||
|
private ColumnFamily<String, String> descriptorStorage;
|
||||||
|
|
||||||
|
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
|
||||||
|
{
|
||||||
|
this.astyanaxContext = new AstyanaxContext.Builder()
|
||||||
|
.forCluster(CLUSTER_NAME)
|
||||||
|
.forKeyspace(config.getKeyspace())
|
||||||
|
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE))
|
||||||
|
.withConnectionPoolConfiguration(
|
||||||
|
new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10)
|
||||||
|
.setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
|
||||||
|
.buildKeyspace(ThriftFamilyFactory.getInstance());
|
||||||
|
this.astyanaxContext.start();
|
||||||
|
this.keyspace = this.astyanaxContext.getEntity();
|
||||||
|
|
||||||
|
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
|
||||||
|
|
||||||
|
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
|
||||||
|
StringSerializer.get(), StringSerializer.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
|
||||||
|
{
|
||||||
|
String key = (String) segment.getLoadSpec().get("key");
|
||||||
|
|
||||||
|
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
|
||||||
|
|
||||||
|
if (!outDir.exists())
|
||||||
|
{
|
||||||
|
outDir.mkdirs();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!outDir.isDirectory())
|
||||||
|
{
|
||||||
|
throw new ISE("outDir[%s] must be a directory.", outDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
ObjectMetadata meta = null;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
final File outFile = new File(outDir, toFilename(key, ".gz"));
|
||||||
|
meta = ChunkedStorage
|
||||||
|
.newReader(indexStorage, key, Files.newOutputStreamSupplier(outFile).getOutput())
|
||||||
|
.withBatchSize(BATCH_SIZE)
|
||||||
|
.withConcurrencyLevel(CONCURRENCY)
|
||||||
|
.call();
|
||||||
|
} catch (Exception e)
|
||||||
|
{
|
||||||
|
log.error("Could not pull segment [" + key + "] from C*", e);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
FileUtils.deleteDirectory(outDir);
|
||||||
|
} catch (IOException ioe)
|
||||||
|
{
|
||||||
|
log.error("Couldn't delete directory [" + outDir + "] for cleanup.", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Pull of file[%s] completed in %,d millis (%s chunks)", key, System.currentTimeMillis() - startTime,
|
||||||
|
meta.getChunkSize());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLastModified(DataSegment segment) throws SegmentLoadingException
|
||||||
|
{
|
||||||
|
String key = (String) segment.getLoadSpec().get("key");
|
||||||
|
OperationResult<ColumnList<String>> result;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
result = this.keyspace.prepareQuery(descriptorStorage)
|
||||||
|
.getKey(key)
|
||||||
|
.execute();
|
||||||
|
ColumnList<String> children = result.getResult();
|
||||||
|
long lastModified = children.getColumnByName("lastmodified").getLongValue();
|
||||||
|
log.info("Read lastModified for [" + key + "] as [" + lastModified + "]");
|
||||||
|
return lastModified;
|
||||||
|
} catch (ConnectionException e)
|
||||||
|
{
|
||||||
|
throw new SegmentLoadingException(e, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String toFilename(String key, final String suffix)
|
||||||
|
{
|
||||||
|
String filename = key.substring(key.lastIndexOf("/") + 1);
|
||||||
|
filename = filename.substring(0, filename.length() - suffix.length());
|
||||||
|
return filename;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.metamx.druid.loading;
|
package com.metamx.druid.loading.cassandra;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
@ -7,30 +7,29 @@ import java.io.IOException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.io.ByteStreams;
|
|
||||||
import com.google.common.io.Files;
|
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import com.metamx.druid.index.v1.IndexIO;
|
import com.metamx.druid.index.v1.IndexIO;
|
||||||
|
import com.metamx.druid.loading.DataSegmentPusher;
|
||||||
|
import com.metamx.druid.loading.DataSegmentPusherUtil;
|
||||||
import com.metamx.druid.utils.CompressionUtils;
|
import com.metamx.druid.utils.CompressionUtils;
|
||||||
import com.netflix.astyanax.AstyanaxContext;
|
import com.netflix.astyanax.AstyanaxContext;
|
||||||
import com.netflix.astyanax.Keyspace;
|
import com.netflix.astyanax.Keyspace;
|
||||||
|
import com.netflix.astyanax.MutationBatch;
|
||||||
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
|
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
|
||||||
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
|
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
|
||||||
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
|
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
|
||||||
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
|
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
|
||||||
|
import com.netflix.astyanax.model.ColumnFamily;
|
||||||
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
|
import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider;
|
||||||
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
|
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
|
||||||
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
|
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
|
||||||
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
|
import com.netflix.astyanax.recipes.storage.ObjectMetadata;
|
||||||
|
import com.netflix.astyanax.serializers.StringSerializer;
|
||||||
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
|
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the data segment pusher for Cassandra.
|
* This is the data segment pusher for Cassandra.
|
||||||
*
|
|
||||||
* Create the schema in cli with:
|
|
||||||
* CREATE COLUMN FAMILY indexStorage WITH comparator = UTF8Type AND key_validation_class=UTF8Type
|
|
||||||
* CREATE COLUMN FAMILY descriptorStorage WITH comparator = UTF8Type AND key_validation_class=UTF8Type
|
|
||||||
*/
|
*/
|
||||||
// TODO: Auto-create the schema if it does not exist.
|
// TODO: Auto-create the schema if it does not exist.
|
||||||
// Should we make it so they can specify tables?
|
// Should we make it so they can specify tables?
|
||||||
|
@ -42,17 +41,16 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
|
||||||
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
|
private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage";
|
||||||
private static final int CONCURRENCY = 10;
|
private static final int CONCURRENCY = 10;
|
||||||
private static final Joiner JOINER = Joiner.on("/").skipNulls();
|
private static final Joiner JOINER = Joiner.on("/").skipNulls();
|
||||||
|
private final CassandraDataSegmentConfig config;
|
||||||
private final CassandraDataSegmentPusherConfig config;
|
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
|
|
||||||
private Keyspace keyspace;
|
private Keyspace keyspace;
|
||||||
private AstyanaxContext<Keyspace> astyanaxContext;
|
private AstyanaxContext<Keyspace> astyanaxContext;
|
||||||
private ChunkedStorageProvider indexStorage;
|
private ChunkedStorageProvider indexStorage;
|
||||||
private ChunkedStorageProvider descriptorStorage;
|
private ColumnFamily<String, String> descriptorStorage;
|
||||||
|
|
||||||
public CassandraDataSegmentPusher(
|
public CassandraDataSegmentPusher(
|
||||||
CassandraDataSegmentPusherConfig config,
|
CassandraDataSegmentConfig config,
|
||||||
ObjectMapper jsonMapper)
|
ObjectMapper jsonMapper)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
@ -68,8 +66,10 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
|
||||||
this.astyanaxContext.start();
|
this.astyanaxContext.start();
|
||||||
this.keyspace = this.astyanaxContext.getEntity();
|
this.keyspace = this.astyanaxContext.getEntity();
|
||||||
|
|
||||||
|
descriptorStorage = new ColumnFamily<String, String>(DESCRIPTOR_TABLE_NAME,
|
||||||
|
StringSerializer.get(), StringSerializer.get());
|
||||||
|
|
||||||
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
|
indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME);
|
||||||
descriptorStorage = new CassandraChunkedStorageProvider(keyspace, DESCRIPTOR_TABLE_NAME);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -86,21 +86,18 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
|
||||||
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
|
long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
|
||||||
int version = IndexIO.getVersionFromDir(indexFilesDir);
|
int version = IndexIO.getVersionFromDir(indexFilesDir);
|
||||||
|
|
||||||
// Create descriptor
|
|
||||||
File descriptorFile = File.createTempFile("druid", "descriptor.json");
|
|
||||||
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
|
ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
|
||||||
.withConcurrencyLevel(CONCURRENCY).call();
|
.withConcurrencyLevel(CONCURRENCY).call();
|
||||||
|
byte[] json = jsonMapper.writeValueAsBytes(segment);
|
||||||
ObjectMetadata descriptorMeta = ChunkedStorage
|
//CassandraDataSegmentDescriptor descriptor = new CassandraDataSegmentDescriptor(segment, json);
|
||||||
.newWriter(descriptorStorage, key, new FileInputStream(descriptorFile))
|
MutationBatch mutation = this.keyspace.prepareMutationBatch();
|
||||||
.withConcurrencyLevel(CONCURRENCY).call();
|
mutation.withRow(descriptorStorage, key)
|
||||||
|
.putColumn("lastmodified", System.currentTimeMillis(), null)
|
||||||
log.debug("Wrote index to C* [" + indexMeta.getParentPath() + "]");
|
.putColumn("descriptor", json, null);
|
||||||
log.debug("Wrote descriptor to C* [" + descriptorMeta.getParentPath() + "]");
|
mutation.execute();
|
||||||
|
log.info("Wrote index to C* [" + indexMeta.getParentPath() + "]");
|
||||||
} catch (Exception e)
|
} catch (Exception e)
|
||||||
{
|
{
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
|
@ -110,12 +107,10 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher
|
||||||
.withLoadSpec(
|
.withLoadSpec(
|
||||||
ImmutableMap.<String, Object> of("type", "c*", "key", key)
|
ImmutableMap.<String, Object> of("type", "c*", "key", key)
|
||||||
)
|
)
|
||||||
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
|
.withBinaryVersion(version);
|
||||||
|
|
||||||
log.info("Deleting zipped index File[%s]", compressedIndexFile);
|
log.info("Deleting zipped index File[%s]", compressedIndexFile);
|
||||||
compressedIndexFile.delete();
|
compressedIndexFile.delete();
|
||||||
log.info("Deleting descriptor file[%s]", descriptorFile);
|
|
||||||
descriptorFile.delete();
|
|
||||||
return segment;
|
return segment;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,78 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.metamx.druid.loading;
|
|
||||||
|
|
||||||
import com.metamx.common.MapUtils;
|
|
||||||
import com.metamx.druid.StorageAdapter;
|
|
||||||
import com.metamx.druid.client.DataSegment;
|
|
||||||
import com.metamx.druid.index.QueryableIndex;
|
|
||||||
import com.metamx.druid.index.Segment;
|
|
||||||
import org.joda.time.Interval;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class CassandraDataSegmentLoader implements SegmentLoader
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
Map<String, Object> loadSpec = segment.getLoadSpec();
|
|
||||||
return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Segment getSegment(final DataSegment segment) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
return new Segment()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public String getIdentifier()
|
|
||||||
{
|
|
||||||
return segment.getIdentifier();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Interval getDataInterval()
|
|
||||||
{
|
|
||||||
return segment.getInterval();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public QueryableIndex asQueryableIndex()
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public StorageAdapter asStorageAdapter()
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue