From bf01399a2afc65e134391ad2766a5a1f5a684343 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Wed, 1 May 2013 15:11:50 -0400 Subject: [PATCH 01/12] Cassandra Data Segment Pusher, initial commit. --- examples/cassandra/schema/druid_schema.cql | 0 index-common/pom.xml | 7 +- .../loading/CassandraDataSegmentKiller.java | 69 +++++++ .../loading/CassandraDataSegmentPuller.java | 170 ++++++++++++++++++ .../loading/CassandraDataSegmentPusher.java | 89 +++++++++ .../CassandraDataSegmentPusherConfig.java | 36 ++++ .../loading/CassandraDataSegmentLoader.java | 78 ++++++++ 7 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 examples/cassandra/schema/druid_schema.cql create mode 100644 server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java create mode 100644 server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java create mode 100644 server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java create mode 100644 server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java create mode 100644 server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java diff --git a/examples/cassandra/schema/druid_schema.cql b/examples/cassandra/schema/druid_schema.cql new file mode 100644 index 00000000000..e69de29bb2d diff --git a/index-common/pom.xml b/index-common/pom.xml index 0bae9b7a70d..d29294f09ce 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -75,7 +75,12 @@ commons-io commons-io - + + com.datastax.cassandra + cassandra-driver-core + 1.0.0-beta2 + + junit diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java new file mode 100644 index 00000000000..e2f6ceb4d3a --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java @@ -0,0 +1,69 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; + +import java.util.Map; + +/** + */ +public class CassandraDataSegmentKiller implements DataSegmentKiller +{ + private static final Logger log = new Logger(CassandraDataSegmentKiller.class); + + private final RestS3Service s3Client; + + @Inject + public CassandraDataSegmentKiller( + RestS3Service s3Client + ) + { + this.s3Client = s3Client; + } + + @Override + public void kill(DataSegment segment) throws SegmentLoadingException + { + try { + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3Path); + } + if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); + } + } + catch (ServiceException e) { + throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier()); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java new file mode 100644 index 00000000000..5d99720f0d4 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java @@ -0,0 +1,170 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import com.google.common.io.ByteStreams; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.common.s3.S3Utils; +import com.metamx.druid.utils.CompressionUtils; +import org.apache.commons.io.FileUtils; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Bucket; +import org.jets3t.service.model.S3Object; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.zip.GZIPInputStream; + +/** + */ +public class CassandraDataSegmentPuller implements DataSegmentPuller +{ + private static final Logger log = new Logger(CassandraDataSegmentPuller.class); + + private static final String BUCKET = "bucket"; + private static final String KEY = "key"; + + private final RestS3Service s3Client; + + @Inject + public CassandraDataSegmentPuller( + RestS3Service s3Client + ) + { + this.s3Client = s3Client; + } + + @Override + public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + { + S3Coords s3Coords = new S3Coords(segment); + + log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); + + if (!isObjectInBucket(s3Coords)) { + throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords); + } + + if (!outDir.exists()) { + outDir.mkdirs(); + } + + if (!outDir.isDirectory()) { + throw new ISE("outDir[%s] must be a directory.", outDir); + } + + long startTime = System.currentTimeMillis(); + S3Object s3Obj = null; + + try { + s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); + + InputStream in = null; + try { + in = s3Obj.getDataInputStream(); + final String key = s3Obj.getKey(); + if (key.endsWith(".zip")) { + CompressionUtils.unzip(in, outDir); + } else if (key.endsWith(".gz")) { + final File outFile = new File(outDir, toFilename(key, ".gz")); + ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); + } else { + ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); + } + log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); + } + catch (IOException e) { + FileUtils.deleteDirectory(outDir); + throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); + } + finally { + Closeables.closeQuietly(in); + } + } + catch (Exception e) { + throw new SegmentLoadingException(e, e.getMessage()); + } + finally { + S3Utils.closeStreamsQuietly(s3Obj); + } + + } + + private String toFilename(String key, final String suffix) + { + String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' + filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end + return filename; + } + + private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException + { + try { + return s3Client.isObjectInBucket(coords.bucket, coords.path); + } + catch (ServiceException e) { + throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); + } + } + + @Override + public long getLastModified(DataSegment segment) throws SegmentLoadingException + { + S3Coords coords = new S3Coords(segment); + try { + S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); + return objDetails.getLastModifiedDate().getTime(); + } + catch (S3ServiceException e) { + throw new SegmentLoadingException(e, e.getMessage()); + } + } + + private static class S3Coords + { + String bucket; + String path; + + public S3Coords(DataSegment segment) + { + Map loadSpec = segment.getLoadSpec(); + bucket = MapUtils.getString(loadSpec, BUCKET); + path = MapUtils.getString(loadSpec, KEY); + if (path.startsWith("/")) { + path = path.substring(1); + } + } + + public String toString() + { + return String.format("s3://%s/%s", bucket, path); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java new file mode 100644 index 00000000000..219c7b34ad8 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java @@ -0,0 +1,89 @@ +package com.metamx.druid.loading; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.utils.CompressionUtils; + +/** + */ +public class CassandraDataSegmentPusher implements DataSegmentPusher +{ + private static final Logger log = new Logger(CassandraDataSegmentPusher.class); + + private final CassandraDataSegmentPusherConfig config; + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private final ObjectMapper jsonMapper; + private Cluster cluster; + private Session session; + private String keyspace = null; + private String table = null; + + + public CassandraDataSegmentPusher( + CassandraDataSegmentPusherConfig config, + ObjectMapper jsonMapper) + { + this.config = config; + this.jsonMapper = jsonMapper; + this.keyspace = this.config.getKeyspace(); + this.table = this.config.getTable(); + + cluster = Cluster.builder().addContactPoints(this.config.getHost()).build(); + session = cluster.connect(); + session.execute("USE " + keyspace); + } + + @Override + public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException + { + log.info("Writing [%s] to C*", indexFilesDir); + String key = JOINER.join( + config.getKeyspace().isEmpty() ? null : config.getKeyspace(), + DataSegmentPusherUtil.getStorageDir(segment) + ); + + // Create index + final File compressedIndexFile = File.createTempFile("druid", "index.zip"); + long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); + int version = IndexIO.getVersionFromDir(indexFilesDir); + + // Create descriptor + File descriptorFile = File.createTempFile("druid", "descriptor.json"); + Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); + + String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)"; + PreparedStatement ps = session.prepare(statement); + byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); + byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); + BoundStatement bs = ps.bind(key, version, descriptorData, indexData); + session.execute(bs); + + segment = segment.withSize(indexSize) + .withLoadSpec( + ImmutableMap.of("type", "c*", "key", key) + ) + .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); + + + log.info("Deleting zipped index File[%s]", compressedIndexFile); + compressedIndexFile.delete(); + log.info("Deleting descriptor file[%s]", descriptorFile); + descriptorFile.delete(); + return segment; + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java new file mode 100644 index 00000000000..27ab925e2a6 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java @@ -0,0 +1,36 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import org.skife.config.Config; + +/** + */ +public abstract class CassandraDataSegmentPusherConfig +{ + @Config("druid.pusher.cassandra.host") + public abstract String getHost(); + + @Config("druid.pusher.cassandra.keyspace") + public abstract String getKeyspace(); + + @Config("druid.pusher.cassandra.table") + public abstract String getTable(); +} diff --git a/server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java b/server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java new file mode 100644 index 00000000000..dbf25d480f7 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java @@ -0,0 +1,78 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import com.metamx.common.MapUtils; +import com.metamx.druid.StorageAdapter; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.QueryableIndex; +import com.metamx.druid.index.Segment; +import org.joda.time.Interval; + +import java.io.File; +import java.util.Map; + +/** +*/ +public class CassandraDataSegmentLoader implements SegmentLoader +{ + @Override + public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException + { + Map loadSpec = segment.getLoadSpec(); + return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); + } + + @Override + public Segment getSegment(final DataSegment segment) throws SegmentLoadingException + { + return new Segment() + { + @Override + public String getIdentifier() + { + return segment.getIdentifier(); + } + + @Override + public Interval getDataInterval() + { + return segment.getInterval(); + } + + @Override + public QueryableIndex asQueryableIndex() + { + throw new UnsupportedOperationException(); + } + + @Override + public StorageAdapter asStorageAdapter() + { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void cleanup(DataSegment loadSpec) throws SegmentLoadingException + { + } +} From b8c8ed8d682dfb1c57e21142e81b297a868d25c1 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Wed, 1 May 2013 15:17:15 -0400 Subject: [PATCH 02/12] Formatting. --- .../loading/CassandraDataSegmentPusher.java | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java index 219c7b34ad8..747e1c0328b 100644 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java @@ -26,13 +26,12 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher private static final Logger log = new Logger(CassandraDataSegmentPusher.class); private final CassandraDataSegmentPusherConfig config; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final ObjectMapper jsonMapper; private Cluster cluster; private Session session; private String keyspace = null; private String table = null; - public CassandraDataSegmentPusher( CassandraDataSegmentPusherConfig config, @@ -42,7 +41,7 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher this.jsonMapper = jsonMapper; this.keyspace = this.config.getKeyspace(); this.table = this.config.getTable(); - + cluster = Cluster.builder().addContactPoints(this.config.getHost()).build(); session = cluster.connect(); session.execute("USE " + keyspace); @@ -51,39 +50,38 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher @Override public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException { - log.info("Writing [%s] to C*", indexFilesDir); - String key = JOINER.join( - config.getKeyspace().isEmpty() ? null : config.getKeyspace(), - DataSegmentPusherUtil.getStorageDir(segment) - ); + log.info("Writing [%s] to C*", indexFilesDir); + String key = JOINER.join( + config.getKeyspace().isEmpty() ? null : config.getKeyspace(), + DataSegmentPusherUtil.getStorageDir(segment) + ); - // Create index - final File compressedIndexFile = File.createTempFile("druid", "index.zip"); - long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); - int version = IndexIO.getVersionFromDir(indexFilesDir); - - // Create descriptor - File descriptorFile = File.createTempFile("druid", "descriptor.json"); - Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); - - String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)"; - PreparedStatement ps = session.prepare(statement); - byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); - byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); - BoundStatement bs = ps.bind(key, version, descriptorData, indexData); - session.execute(bs); - - segment = segment.withSize(indexSize) - .withLoadSpec( - ImmutableMap.of("type", "c*", "key", key) - ) - .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); + // Create index + final File compressedIndexFile = File.createTempFile("druid", "index.zip"); + long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); + int version = IndexIO.getVersionFromDir(indexFilesDir); - - log.info("Deleting zipped index File[%s]", compressedIndexFile); - compressedIndexFile.delete(); - log.info("Deleting descriptor file[%s]", descriptorFile); - descriptorFile.delete(); - return segment; + // Create descriptor + File descriptorFile = File.createTempFile("druid", "descriptor.json"); + Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); + + String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)"; + PreparedStatement ps = session.prepare(statement); + byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); + byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); + BoundStatement bs = ps.bind(key, version, descriptorData, indexData); + session.execute(bs); + + segment = segment.withSize(indexSize) + .withLoadSpec( + ImmutableMap. of("type", "c*", "key", key) + ) + .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); + + log.info("Deleting zipped index File[%s]", compressedIndexFile); + compressedIndexFile.delete(); + log.info("Deleting descriptor file[%s]", descriptorFile); + descriptorFile.delete(); + return segment; } } From 79d802fedeec0d1fa3cd43536ea4ff6c4a2af342 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Mon, 6 May 2013 17:54:12 -0400 Subject: [PATCH 03/12] Working data segment pushed for C*. --- .../examples/RealtimeStandaloneMain.java | 12 +-- index-common/pom.xml | 7 +- server/pom.xml | 5 ++ .../druid/initialization/ServerInit.java | 32 +++++--- .../loading/CassandraDataSegmentPusher.java | 76 ++++++++++++++----- .../CassandraDataSegmentPusherConfig.java | 3 - 6 files changed, 88 insertions(+), 47 deletions(-) diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java index 1dc60cd0e31..10edff548d7 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -45,12 +45,12 @@ public class RealtimeStandaloneMain ); // Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage - rn.setSegmentPublisher(new NoopSegmentPublisher()); - rn.setAnnouncer(new NoopDataSegmentAnnouncer()); - rn.setDataSegmentPusher(new NoopDataSegmentPusher()); - rn.setServerView(new NoopServerView()); - rn.setInventoryView(new NoopInventoryView()); - + //rn.setSegmentPublisher(new NoopSegmentPublisher()); + //rn.setAnnouncer(new NoopDataSegmentAnnouncer()); + //rn.setDataSegmentPusher(new NoopDataSegmentPusher()); + //rn.setServerView(new NoopServerView()); + //rn.setInventoryView(new NoopInventoryView()); + Runtime.getRuntime().addShutdownHook( new Thread( new Runnable() diff --git a/index-common/pom.xml b/index-common/pom.xml index d29294f09ce..0bae9b7a70d 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -75,12 +75,7 @@ commons-io commons-io - - com.datastax.cassandra - cassandra-driver-core - 1.0.0-beta2 - - + junit diff --git a/server/pom.xml b/server/pom.xml index 2f66678a1ea..efe699ed1dc 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -194,6 +194,11 @@ java-xmlbuilder true + + com.netflix.astyanax + astyanax + 1.0.1 + diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 657ce0693d6..21d97540a2e 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -19,6 +19,18 @@ package com.metamx.druid.initialization; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.security.AWSCredentials; +import org.skife.config.ConfigurationObjectFactory; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -29,6 +41,9 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.DruidProcessingConfig; import com.metamx.druid.Query; import com.metamx.druid.collect.StupidPool; +import com.metamx.druid.loading.CassandraDataSegmentPuller; +import com.metamx.druid.loading.CassandraDataSegmentPusher; +import com.metamx.druid.loading.CassandraDataSegmentPusherConfig; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DelegatingSegmentLoader; import com.metamx.druid.loading.HdfsDataSegmentPuller; @@ -60,17 +75,6 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; import com.metamx.druid.query.timeseries.TimeseriesQuery; import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; import com.metamx.druid.utils.PropUtils; -import org.apache.hadoop.conf.Configuration; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; -import org.skife.config.ConfigurationObjectFactory; - -import java.lang.reflect.InvocationTargetException; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; /** */ @@ -96,6 +100,7 @@ public class ServerInit .put("s3_zip", s3segmentLoader) .put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)) .put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)) + .put("cassandra", new SingleSegmentLoader(new CassandraDataSegmentPuller(new Configuration()), factory, config)) .build() ); @@ -171,6 +176,11 @@ public class ServerInit if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) { return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper); } + else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) { + final CassandraDataSegmentPusherConfig config = configFactory.build(CassandraDataSegmentPusherConfig.class); + + return new CassandraDataSegmentPusher(config, jsonMapper); + } else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) { final HdfsDataSegmentPusherConfig config = configFactory.build(HdfsDataSegmentPusherConfig.class); diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java index 747e1c0328b..b87687b2e90 100644 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java @@ -3,12 +3,7 @@ package com.metamx.druid.loading; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; @@ -18,20 +13,43 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.utils.CompressionUtils; +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ChunkedStorage; +import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ObjectMetadata; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** + * This is the data segment pusher for Cassandra. + * + * Create the schema in cli with: + * CREATE COLUMN FAMILY indexStorage WITH comparator = UTF8Type AND key_validation_class=UTF8Type + * CREATE COLUMN FAMILY descriptorStorage WITH comparator = UTF8Type AND key_validation_class=UTF8Type */ +// TODO: Auto-create the schema if it does not exist. +// Should we make it so they can specify tables? public class CassandraDataSegmentPusher implements DataSegmentPusher { private static final Logger log = new Logger(CassandraDataSegmentPusher.class); + private static final String CLUSTER_NAME = "druid_cassandra_cluster"; + private static final String INDEX_TABLE_NAME = "index_storage"; + private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; + private static final int CONCURRENCY = 10; + private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final CassandraDataSegmentPusherConfig config; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final ObjectMapper jsonMapper; - private Cluster cluster; - private Session session; - private String keyspace = null; - private String table = null; + + private Keyspace keyspace; + private AstyanaxContext astyanaxContext; + private ChunkedStorageProvider indexStorage; + private ChunkedStorageProvider descriptorStorage; public CassandraDataSegmentPusher( CassandraDataSegmentPusherConfig config, @@ -39,12 +57,19 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher { this.config = config; this.jsonMapper = jsonMapper; - this.keyspace = this.config.getKeyspace(); - this.table = this.config.getTable(); + this.astyanaxContext = new AstyanaxContext.Builder() + .forCluster(CLUSTER_NAME) + .forKeyspace(config.getKeyspace()) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration( + new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) + .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + this.astyanaxContext.start(); + this.keyspace = this.astyanaxContext.getEntity(); - cluster = Cluster.builder().addContactPoints(this.config.getHost()).build(); - session = cluster.connect(); - session.execute("USE " + keyspace); + indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); + descriptorStorage = new CassandraChunkedStorageProvider(keyspace, DESCRIPTOR_TABLE_NAME); } @Override @@ -65,12 +90,21 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher File descriptorFile = File.createTempFile("druid", "descriptor.json"); Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); - String statement = "INSERT INTO " + keyspace + "." + table + "(key, version, descriptor, index) VALUES (?,?,?,?)"; - PreparedStatement ps = session.prepare(statement); - byte[] indexData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); - byte[] descriptorData = ByteStreams.toByteArray(new FileInputStream(compressedIndexFile)); - BoundStatement bs = ps.bind(key, version, descriptorData, indexData); - session.execute(bs); + try + { + ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + .withConcurrencyLevel(CONCURRENCY).call(); + + ObjectMetadata descriptorMeta = ChunkedStorage + .newWriter(descriptorStorage, key, new FileInputStream(descriptorFile)) + .withConcurrencyLevel(CONCURRENCY).call(); + + log.debug("Wrote index to C* [" + indexMeta.getParentPath() + "]"); + log.debug("Wrote descriptor to C* [" + descriptorMeta.getParentPath() + "]"); + } catch (Exception e) + { + throw new IOException(e); + } segment = segment.withSize(indexSize) .withLoadSpec( diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java index 27ab925e2a6..bf3413912a3 100644 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java @@ -30,7 +30,4 @@ public abstract class CassandraDataSegmentPusherConfig @Config("druid.pusher.cassandra.keyspace") public abstract String getKeyspace(); - - @Config("druid.pusher.cassandra.table") - public abstract String getTable(); } From 8e8736291f5407851a03815d8c403218e3de1d20 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Mon, 6 May 2013 23:14:18 -0400 Subject: [PATCH 04/12] Added puller. --- examples/cassandra/schema/druid_schema.cql | 2 + .../com/metamx/druid/http/ComputeNode.java | 10 +- .../druid/initialization/ServerInit.java | 52 ++++-- .../loading/CassandraDataSegmentKiller.java | 69 ------- .../loading/CassandraDataSegmentPuller.java | 170 ------------------ .../CassandraDataSegmentConfig.java} | 4 +- .../cassandra/CassandraDataSegmentPuller.java | 155 ++++++++++++++++ .../CassandraDataSegmentPusher.java | 51 +++--- .../loading/CassandraDataSegmentLoader.java | 78 -------- 9 files changed, 217 insertions(+), 374 deletions(-) delete mode 100644 server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java delete mode 100644 server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java rename server/src/main/java/com/metamx/druid/loading/{CassandraDataSegmentPusherConfig.java => cassandra/CassandraDataSegmentConfig.java} (91%) create mode 100644 server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java rename server/src/main/java/com/metamx/druid/loading/{ => cassandra}/CassandraDataSegmentPusher.java (74%) delete mode 100644 server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java diff --git a/examples/cassandra/schema/druid_schema.cql b/examples/cassandra/schema/druid_schema.cql index e69de29bb2d..a38106d9715 100644 --- a/examples/cassandra/schema/druid_schema.cql +++ b/examples/cassandra/schema/druid_schema.cql @@ -0,0 +1,2 @@ +CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; +CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 800b16856c6..f538b69b4db 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -141,17 +141,9 @@ public class ComputeNode extends BaseServerNode private void initializeSegmentLoader() { if (segmentLoader == null) { - final Properties props = getProps(); try { - final RestS3Service s3Client = new RestS3Service( - new AWSCredentials( - PropUtils.getProperty(props, "com.metamx.aws.accessKey"), - PropUtils.getProperty(props, "com.metamx.aws.secretKey") - ) - ); - setSegmentLoader( - ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class)) + ServerInit.makeDefaultQueryableLoader(getConfigFactory(), getProps()) ); } catch (S3ServiceException e) { diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 21d97540a2e..4c848c88d68 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -21,6 +21,7 @@ package com.metamx.druid.initialization; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; @@ -41,9 +42,6 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.DruidProcessingConfig; import com.metamx.druid.Query; import com.metamx.druid.collect.StupidPool; -import com.metamx.druid.loading.CassandraDataSegmentPuller; -import com.metamx.druid.loading.CassandraDataSegmentPusher; -import com.metamx.druid.loading.CassandraDataSegmentPusherConfig; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DelegatingSegmentLoader; import com.metamx.druid.loading.HdfsDataSegmentPuller; @@ -60,6 +58,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig; import com.metamx.druid.loading.SegmentLoader; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SingleSegmentLoader; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentConfig; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentPuller; +import com.metamx.druid.loading.cassandra.CassandraDataSegmentPusher; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.group.GroupByQuery; import com.metamx.druid.query.group.GroupByQueryEngine; @@ -83,27 +84,42 @@ public class ServerInit private static Logger log = new Logger(ServerInit.class); public static SegmentLoader makeDefaultQueryableLoader( - RestS3Service s3Client, - SegmentLoaderConfig config - ) + final ConfigurationObjectFactory configFactory, + final Properties props + ) throws S3ServiceException { + SegmentLoaderConfig config = configFactory.build(SegmentLoaderConfig.class); DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader(); - - final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client); final QueryableIndexFactory factory = new MMappedQueryableIndexFactory(); - SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config); + SingleSegmentLoader s3segmentLoader= null; + try { + final RestS3Service s3Client = new RestS3Service( + new AWSCredentials( + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") + ) + ); + final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client); + s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config); + } catch (com.metamx.common.ISE ise){ + log.warn("Could not create s3Client.", ise); + } + Map loaderTypes = new HashMap(); + loaderTypes.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)); + loaderTypes.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)); + loaderTypes.put("c*", + new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config)); + + if (s3segmentLoader != null){ + loaderTypes.put("s3", s3segmentLoader); + loaderTypes.put("s3_zip", s3segmentLoader); + } + delegateLoader.setLoaderTypes( - ImmutableMap.builder() - .put("s3", s3segmentLoader) - .put("s3_zip", s3segmentLoader) - .put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)) - .put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)) - .put("cassandra", new SingleSegmentLoader(new CassandraDataSegmentPuller(new Configuration()), factory, config)) - .build() + ImmutableMap.copyOf(loaderTypes) ); - return delegateLoader; } @@ -177,7 +193,7 @@ public class ServerInit return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper); } else if (Boolean.parseBoolean(props.getProperty("druid.pusher.cassandra", "false"))) { - final CassandraDataSegmentPusherConfig config = configFactory.build(CassandraDataSegmentPusherConfig.class); + final CassandraDataSegmentConfig config = configFactory.build(CassandraDataSegmentConfig.class); return new CassandraDataSegmentPusher(config, jsonMapper); } diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java deleted file mode 100644 index e2f6ceb4d3a..00000000000 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentKiller.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.loading; - -import com.google.inject.Inject; -import com.metamx.common.MapUtils; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; - -import java.util.Map; - -/** - */ -public class CassandraDataSegmentKiller implements DataSegmentKiller -{ - private static final Logger log = new Logger(CassandraDataSegmentKiller.class); - - private final RestS3Service s3Client; - - @Inject - public CassandraDataSegmentKiller( - RestS3Service s3Client - ) - { - this.s3Client = s3Client; - } - - @Override - public void kill(DataSegment segment) throws SegmentLoadingException - { - try { - Map loadSpec = segment.getLoadSpec(); - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; - - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { - log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); - s3Client.deleteObject(s3Bucket, s3Path); - } - if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { - log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); - s3Client.deleteObject(s3Bucket, s3DescriptorPath); - } - } - catch (ServiceException e) { - throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier()); - } - } -} diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java deleted file mode 100644 index 5d99720f0d4..00000000000 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPuller.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.loading; - -import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; -import com.google.common.io.Files; -import com.google.inject.Inject; -import com.metamx.common.ISE; -import com.metamx.common.MapUtils; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.common.s3.S3Utils; -import com.metamx.druid.utils.CompressionUtils; -import org.apache.commons.io.FileUtils; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; -import org.jets3t.service.model.S3Object; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.zip.GZIPInputStream; - -/** - */ -public class CassandraDataSegmentPuller implements DataSegmentPuller -{ - private static final Logger log = new Logger(CassandraDataSegmentPuller.class); - - private static final String BUCKET = "bucket"; - private static final String KEY = "key"; - - private final RestS3Service s3Client; - - @Inject - public CassandraDataSegmentPuller( - RestS3Service s3Client - ) - { - this.s3Client = s3Client; - } - - @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException - { - S3Coords s3Coords = new S3Coords(segment); - - log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); - - if (!isObjectInBucket(s3Coords)) { - throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords); - } - - if (!outDir.exists()) { - outDir.mkdirs(); - } - - if (!outDir.isDirectory()) { - throw new ISE("outDir[%s] must be a directory.", outDir); - } - - long startTime = System.currentTimeMillis(); - S3Object s3Obj = null; - - try { - s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); - - InputStream in = null; - try { - in = s3Obj.getDataInputStream(); - final String key = s3Obj.getKey(); - if (key.endsWith(".zip")) { - CompressionUtils.unzip(in, outDir); - } else if (key.endsWith(".gz")) { - final File outFile = new File(outDir, toFilename(key, ".gz")); - ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); - } else { - ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); - } - log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); - } - catch (IOException e) { - FileUtils.deleteDirectory(outDir); - throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); - } - finally { - Closeables.closeQuietly(in); - } - } - catch (Exception e) { - throw new SegmentLoadingException(e, e.getMessage()); - } - finally { - S3Utils.closeStreamsQuietly(s3Obj); - } - - } - - private String toFilename(String key, final String suffix) - { - String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' - filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end - return filename; - } - - private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException - { - try { - return s3Client.isObjectInBucket(coords.bucket, coords.path); - } - catch (ServiceException e) { - throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); - } - } - - @Override - public long getLastModified(DataSegment segment) throws SegmentLoadingException - { - S3Coords coords = new S3Coords(segment); - try { - S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); - return objDetails.getLastModifiedDate().getTime(); - } - catch (S3ServiceException e) { - throw new SegmentLoadingException(e, e.getMessage()); - } - } - - private static class S3Coords - { - String bucket; - String path; - - public S3Coords(DataSegment segment) - { - Map loadSpec = segment.getLoadSpec(); - bucket = MapUtils.getString(loadSpec, BUCKET); - path = MapUtils.getString(loadSpec, KEY); - if (path.startsWith("/")) { - path = path.substring(1); - } - } - - public String toString() - { - return String.format("s3://%s/%s", bucket, path); - } - } -} diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java similarity index 91% rename from server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java rename to server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java index bf3413912a3..c370f0c2ecc 100644 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusherConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java @@ -17,13 +17,13 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.loading; +package com.metamx.druid.loading.cassandra; import org.skife.config.Config; /** */ -public abstract class CassandraDataSegmentPusherConfig +public abstract class CassandraDataSegmentConfig { @Config("druid.pusher.cassandra.host") public abstract String getHost(); diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java new file mode 100644 index 00000000000..f6c42a1d54c --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -0,0 +1,155 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading.cassandra; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +import com.google.common.io.Files; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.loading.DataSegmentPuller; +import com.metamx.druid.loading.SegmentLoadingException; +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.model.ColumnList; +import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ChunkedStorage; +import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ObjectMetadata; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +/** + */ +public class CassandraDataSegmentPuller implements DataSegmentPuller +{ + private static final Logger log = new Logger(CassandraDataSegmentPuller.class); + private static final String CLUSTER_NAME = "druid_cassandra_cluster"; + private static final String INDEX_TABLE_NAME = "index_storage"; + private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; + private static final int CONCURRENCY = 10; + private static final int BATCH_SIZE = 10; + + private Keyspace keyspace; + private AstyanaxContext astyanaxContext; + private ChunkedStorageProvider indexStorage; + private ColumnFamily descriptorStorage; + + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + { + this.astyanaxContext = new AstyanaxContext.Builder() + .forCluster(CLUSTER_NAME) + .forKeyspace(config.getKeyspace()) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration( + new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) + .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + this.astyanaxContext.start(); + this.keyspace = this.astyanaxContext.getEntity(); + + indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); + + descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, + StringSerializer.get(), StringSerializer.get()); + } + + @Override + public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + + log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); + + if (!outDir.exists()) + { + outDir.mkdirs(); + } + + if (!outDir.isDirectory()) + { + throw new ISE("outDir[%s] must be a directory.", outDir); + } + + long startTime = System.currentTimeMillis(); + ObjectMetadata meta = null; + try + { + final File outFile = new File(outDir, toFilename(key, ".gz")); + meta = ChunkedStorage + .newReader(indexStorage, key, Files.newOutputStreamSupplier(outFile).getOutput()) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); + } catch (Exception e) + { + log.error("Could not pull segment [" + key + "] from C*", e); + try + { + FileUtils.deleteDirectory(outDir); + } catch (IOException ioe) + { + log.error("Couldn't delete directory [" + outDir + "] for cleanup.", ioe); + } + } + + log.info("Pull of file[%s] completed in %,d millis (%s chunks)", key, System.currentTimeMillis() - startTime, + meta.getChunkSize()); + + } + + @Override + public long getLastModified(DataSegment segment) throws SegmentLoadingException + { + String key = (String) segment.getLoadSpec().get("key"); + OperationResult> result; + try + { + result = this.keyspace.prepareQuery(descriptorStorage) + .getKey(key) + .execute(); + ColumnList children = result.getResult(); + long lastModified = children.getColumnByName("lastmodified").getLongValue(); + log.info("Read lastModified for [" + key + "] as [" + lastModified + "]"); + return lastModified; + } catch (ConnectionException e) + { + throw new SegmentLoadingException(e, e.getMessage()); + } + } + + private String toFilename(String key, final String suffix) + { + String filename = key.substring(key.lastIndexOf("/") + 1); + filename = filename.substring(0, filename.length() - suffix.length()); + return filename; + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java similarity index 74% rename from server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java rename to server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java index b87687b2e90..525ff969780 100644 --- a/server/src/main/java/com/metamx/druid/loading/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -1,4 +1,4 @@ -package com.metamx.druid.loading; +package com.metamx.druid.loading.cassandra; import java.io.File; import java.io.FileInputStream; @@ -7,30 +7,29 @@ import java.io.IOException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteStreams; -import com.google.common.io.Files; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.DataSegmentPusherUtil; import com.metamx.druid.utils.CompressionUtils; import com.netflix.astyanax.AstyanaxContext; import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.NodeDiscoveryType; import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ObjectMetadata; +import com.netflix.astyanax.serializers.StringSerializer; import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** * This is the data segment pusher for Cassandra. - * - * Create the schema in cli with: - * CREATE COLUMN FAMILY indexStorage WITH comparator = UTF8Type AND key_validation_class=UTF8Type - * CREATE COLUMN FAMILY descriptorStorage WITH comparator = UTF8Type AND key_validation_class=UTF8Type */ // TODO: Auto-create the schema if it does not exist. // Should we make it so they can specify tables? @@ -41,18 +40,17 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher private static final String INDEX_TABLE_NAME = "index_storage"; private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; private static final int CONCURRENCY = 10; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); - - private final CassandraDataSegmentPusherConfig config; + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private final CassandraDataSegmentConfig config; private final ObjectMapper jsonMapper; private Keyspace keyspace; private AstyanaxContext astyanaxContext; private ChunkedStorageProvider indexStorage; - private ChunkedStorageProvider descriptorStorage; + private ColumnFamily descriptorStorage; public CassandraDataSegmentPusher( - CassandraDataSegmentPusherConfig config, + CassandraDataSegmentConfig config, ObjectMapper jsonMapper) { this.config = config; @@ -67,9 +65,11 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher .buildKeyspace(ThriftFamilyFactory.getInstance()); this.astyanaxContext.start(); this.keyspace = this.astyanaxContext.getEntity(); - + + descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, + StringSerializer.get(), StringSerializer.get()); + indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); - descriptorStorage = new CassandraChunkedStorageProvider(keyspace, DESCRIPTOR_TABLE_NAME); } @Override @@ -86,21 +86,18 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); int version = IndexIO.getVersionFromDir(indexFilesDir); - // Create descriptor - File descriptorFile = File.createTempFile("druid", "descriptor.json"); - Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); - try { ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) .withConcurrencyLevel(CONCURRENCY).call(); - - ObjectMetadata descriptorMeta = ChunkedStorage - .newWriter(descriptorStorage, key, new FileInputStream(descriptorFile)) - .withConcurrencyLevel(CONCURRENCY).call(); - - log.debug("Wrote index to C* [" + indexMeta.getParentPath() + "]"); - log.debug("Wrote descriptor to C* [" + descriptorMeta.getParentPath() + "]"); + byte[] json = jsonMapper.writeValueAsBytes(segment); + //CassandraDataSegmentDescriptor descriptor = new CassandraDataSegmentDescriptor(segment, json); + MutationBatch mutation = this.keyspace.prepareMutationBatch(); + mutation.withRow(descriptorStorage, key) + .putColumn("lastmodified", System.currentTimeMillis(), null) + .putColumn("descriptor", json, null); + mutation.execute(); + log.info("Wrote index to C* [" + indexMeta.getParentPath() + "]"); } catch (Exception e) { throw new IOException(e); @@ -110,12 +107,10 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher .withLoadSpec( ImmutableMap. of("type", "c*", "key", key) ) - .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); + .withBinaryVersion(version); log.info("Deleting zipped index File[%s]", compressedIndexFile); compressedIndexFile.delete(); - log.info("Deleting descriptor file[%s]", descriptorFile); - descriptorFile.delete(); return segment; } } diff --git a/server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java b/server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java deleted file mode 100644 index dbf25d480f7..00000000000 --- a/server/src/test/java/com/metamx/druid/loading/CassandraDataSegmentLoader.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.loading; - -import com.metamx.common.MapUtils; -import com.metamx.druid.StorageAdapter; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.index.QueryableIndex; -import com.metamx.druid.index.Segment; -import org.joda.time.Interval; - -import java.io.File; -import java.util.Map; - -/** -*/ -public class CassandraDataSegmentLoader implements SegmentLoader -{ - @Override - public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException - { - Map loadSpec = segment.getLoadSpec(); - return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); - } - - @Override - public Segment getSegment(final DataSegment segment) throws SegmentLoadingException - { - return new Segment() - { - @Override - public String getIdentifier() - { - return segment.getIdentifier(); - } - - @Override - public Interval getDataInterval() - { - return segment.getInterval(); - } - - @Override - public QueryableIndex asQueryableIndex() - { - throw new UnsupportedOperationException(); - } - - @Override - public StorageAdapter asStorageAdapter() - { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void cleanup(DataSegment loadSpec) throws SegmentLoadingException - { - } -} From 17835e6e08eaf4ceb01d41b9c5516a034111f3d5 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Mon, 6 May 2013 23:41:58 -0400 Subject: [PATCH 05/12] Examples for the cassandra storage. --- examples/cassandra/client.sh | 1 + examples/cassandra/query | 19 +++++++++++++++++++ examples/cassandra/query2 | 11 +++++++++++ 3 files changed, 31 insertions(+) create mode 100755 examples/cassandra/client.sh create mode 100644 examples/cassandra/query create mode 100644 examples/cassandra/query2 diff --git a/examples/cassandra/client.sh b/examples/cassandra/client.sh new file mode 100755 index 00000000000..62245719f1b --- /dev/null +++ b/examples/cassandra/client.sh @@ -0,0 +1 @@ +curl -sX POST "http://localhost:7070/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query diff --git a/examples/cassandra/query b/examples/cassandra/query new file mode 100644 index 00000000000..09aa4d3ce5d --- /dev/null +++ b/examples/cassandra/query @@ -0,0 +1,19 @@ +{ + "queryType": "groupBy", + "dataSource": "randSeq", + "granularity": "all", + "dimensions": [], + "aggregations":[ + { "type": "count", "name": "rows"}, + { "type": "doubleSum", "fieldName": "events", "name": "e"}, + { "type": "doubleSum", "fieldName": "outColumn", "name": "randomNumberSum"} + ], + "postAggregations":[ + { "type":"arithmetic", + "name":"avg_random", + "fn":"/", + "fields":[ {"type":"fieldAccess","name":"randomNumberSum","fieldName":"randomNumberSum"}, + {"type":"fieldAccess","name":"rows","fieldName":"rows"} ]} + ], + "intervals":["2012-10-01T00:00/2020-01-01T00"] + } diff --git a/examples/cassandra/query2 b/examples/cassandra/query2 new file mode 100644 index 00000000000..c53a943f899 --- /dev/null +++ b/examples/cassandra/query2 @@ -0,0 +1,11 @@ +{ + "queryType": "groupBy", + "dataSource": "appevents", + "granularity": "all", + "dimensions": ["appid", "event"], + "aggregations":[ + {"type":"count", "name":"eventcount"}, + {"type":"doubleSum", "fieldName":"events", "name":"eventssum"} + ], + "intervals":["2012-10-01T00:00/2020-01-01T00"] +} From 41e9f8fcb4f3e240d7b9b0f2309012b472a01b0d Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Tue, 7 May 2013 11:35:14 -0400 Subject: [PATCH 06/12] Working Push & Pull. --- examples/cassandra/client.sh | 2 +- .../com/metamx/druid/http/InfoResource.java | 1 + .../cassandra/CassandraDataSegmentConfig.java | 3 + .../cassandra/CassandraDataSegmentPuller.java | 82 +++++-------------- .../cassandra/CassandraDataSegmentPusher.java | 13 +-- .../loading/cassandra/CassandraStorage.java | 71 ++++++++++++++++ 6 files changed, 104 insertions(+), 68 deletions(-) create mode 100644 server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java diff --git a/examples/cassandra/client.sh b/examples/cassandra/client.sh index 62245719f1b..5ecb6d6cede 100755 --- a/examples/cassandra/client.sh +++ b/examples/cassandra/client.sh @@ -1 +1 @@ -curl -sX POST "http://localhost:7070/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query +curl -sX POST "http://localhost:9090/druid/v2/?pretty=true" -H 'content-type: application/json' -d @query diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index ff5b5ae58d7..18e9e68e4c4 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -72,6 +72,7 @@ public class InfoResource InventoryView serverInventoryView, DatabaseSegmentManager databaseSegmentManager, DatabaseRuleManager databaseRuleManager, + @Nullable IndexingServiceClient indexingServiceClient ) { diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java index c370f0c2ecc..d2b9e2cbe83 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentConfig.java @@ -22,6 +22,9 @@ package com.metamx.druid.loading.cassandra; import org.skife.config.Config; /** + * Cassandra Config + * + * @author boneill42 */ public abstract class CassandraDataSegmentConfig { diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java index f6c42a1d54c..823fa46ce24 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -20,66 +20,36 @@ package com.metamx.druid.loading.cassandra; import java.io.File; -import java.io.IOException; - -import org.apache.commons.io.FileUtils; +import java.io.OutputStream; +import com.google.common.base.Throwables; import com.google.common.io.Files; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.loading.DataSegmentPuller; import com.metamx.druid.loading.SegmentLoadingException; -import com.netflix.astyanax.AstyanaxContext; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.metamx.druid.utils.CompressionUtils; import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; -import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; -import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; -import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.ColumnList; -import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorage; -import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ObjectMetadata; -import com.netflix.astyanax.serializers.StringSerializer; -import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** + * Cassandra Segment Puller + * + * @author boneill42 */ -public class CassandraDataSegmentPuller implements DataSegmentPuller +public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller { private static final Logger log = new Logger(CassandraDataSegmentPuller.class); - private static final String CLUSTER_NAME = "druid_cassandra_cluster"; - private static final String INDEX_TABLE_NAME = "index_storage"; - private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; private static final int CONCURRENCY = 10; private static final int BATCH_SIZE = 10; - private Keyspace keyspace; - private AstyanaxContext astyanaxContext; - private ChunkedStorageProvider indexStorage; - private ColumnFamily descriptorStorage; - - public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) { - this.astyanaxContext = new AstyanaxContext.Builder() - .forCluster(CLUSTER_NAME) - .forKeyspace(config.getKeyspace()) - .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) - .withConnectionPoolConfiguration( - new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) - .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) - .buildKeyspace(ThriftFamilyFactory.getInstance()); - this.astyanaxContext.start(); - this.keyspace = this.astyanaxContext.getEntity(); - - indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); - - descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, - StringSerializer.get(), StringSerializer.get()); + super(config); } @Override @@ -103,27 +73,22 @@ public class CassandraDataSegmentPuller implements DataSegmentPuller ObjectMetadata meta = null; try { - final File outFile = new File(outDir, toFilename(key, ".gz")); + final File outFile = new File(outDir, "index.zip"); + log.info("Writing to [" + outFile.getAbsolutePath() + "]"); + OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); meta = ChunkedStorage - .newReader(indexStorage, key, Files.newOutputStreamSupplier(outFile).getOutput()) + .newReader(indexStorage, key, os) .withBatchSize(BATCH_SIZE) .withConcurrencyLevel(CONCURRENCY) - .call(); + .call(); + os.close(); + CompressionUtils.unzip(outFile, outDir); } catch (Exception e) { - log.error("Could not pull segment [" + key + "] from C*", e); - try - { - FileUtils.deleteDirectory(outDir); - } catch (IOException ioe) - { - log.error("Couldn't delete directory [" + outDir + "] for cleanup.", ioe); - } - } - - log.info("Pull of file[%s] completed in %,d millis (%s chunks)", key, System.currentTimeMillis() - startTime, - meta.getChunkSize()); - + throw Throwables.propagate(e); + } + log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, + meta.getObjectSize()); } @Override @@ -145,11 +110,4 @@ public class CassandraDataSegmentPuller implements DataSegmentPuller throw new SegmentLoadingException(e, e.getMessage()); } } - - private String toFilename(String key, final String suffix) - { - String filename = key.substring(key.lastIndexOf("/") + 1); - filename = filename.substring(0, filename.length() - suffix.length()); - return filename; - } } diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java index 525ff969780..7bd1f897347 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -24,12 +24,13 @@ import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; -import com.netflix.astyanax.recipes.storage.ObjectMetadata; import com.netflix.astyanax.serializers.StringSerializer; import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** - * This is the data segment pusher for Cassandra. + * Cassandra Segment Pusher + * + * @author boneill42 */ // TODO: Auto-create the schema if it does not exist. // Should we make it so they can specify tables? @@ -84,20 +85,22 @@ public class CassandraDataSegmentPusher implements DataSegmentPusher // Create index final File compressedIndexFile = File.createTempFile("druid", "index.zip"); long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); + log.info("Wrote compressed file [%s] to [%s]", compressedIndexFile.getAbsolutePath(), key); + int version = IndexIO.getVersionFromDir(indexFilesDir); try { - ObjectMetadata indexMeta = ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + long start = System.currentTimeMillis(); + ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) .withConcurrencyLevel(CONCURRENCY).call(); byte[] json = jsonMapper.writeValueAsBytes(segment); - //CassandraDataSegmentDescriptor descriptor = new CassandraDataSegmentDescriptor(segment, json); MutationBatch mutation = this.keyspace.prepareMutationBatch(); mutation.withRow(descriptorStorage, key) .putColumn("lastmodified", System.currentTimeMillis(), null) .putColumn("descriptor", json, null); mutation.execute(); - log.info("Wrote index to C* [" + indexMeta.getParentPath() + "]"); + log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); } catch (Exception e) { throw new IOException(e); diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java new file mode 100644 index 00000000000..e770af69c59 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java @@ -0,0 +1,71 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading.cassandra; + +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; +import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +/** + * Superclass for accessing Cassandra Storage. + * + * This is the schema used to support the index and descriptor storage: + * + * CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; + * CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; + */ +public class CassandraStorage +{ + private static final String CLUSTER_NAME = "druid_cassandra_cluster"; + private static final String INDEX_TABLE_NAME = "index_storage"; + private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; + + private AstyanaxContext astyanaxContext; + Keyspace keyspace; + ChunkedStorageProvider indexStorage; + ColumnFamily descriptorStorage; + + public CassandraStorage(CassandraDataSegmentConfig config) + { + this.astyanaxContext = new AstyanaxContext.Builder() + .forCluster(CLUSTER_NAME) + .forKeyspace(config.getKeyspace()) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration( + new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) + .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + this.astyanaxContext.start(); + this.keyspace = this.astyanaxContext.getEntity(); + + indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); + + descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, + StringSerializer.get(), StringSerializer.get()); + } +} From 10a96626d47be26c95f035dbc31f61f633e60018 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Tue, 7 May 2013 16:53:12 -0400 Subject: [PATCH 07/12] Documentation for C* --- examples/cassandra/README.md | 32 +++++++++++++++++++ examples/cassandra/query2 | 11 ------- .../examples/RealtimeStandaloneMain.java | 10 +++--- 3 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 examples/cassandra/README.md delete mode 100644 examples/cassandra/query2 diff --git a/examples/cassandra/README.md b/examples/cassandra/README.md new file mode 100644 index 00000000000..8f34c805d00 --- /dev/null +++ b/examples/cassandra/README.md @@ -0,0 +1,32 @@ +## Introduction +Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables: +`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The +index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains +compressed segments for distribution to real-time and compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread +the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that +stores the segment metadatak. + +## Schema +Below are the create statements for each: + + + + CREATE TABLE index_storage ( key text, chunk text, value blob, PRIMARY KEY (key, chunk)) WITH COMPACT STORAGE; + + CREATE TABLE descriptor_storage ( key varchar, lastModified timestamp, descriptor varchar, PRIMARY KEY (key) ) WITH COMPACT STORAGE; + + +## Getting Started +First create the schema above. (I use a new keyspace called `druid`) + +Then, add the following properties to your properties file to enable a Cassandra +backend. + + druid.pusher.cassandra=true + druid.pusher.cassandra.host=localhost:9160 + druid.pusher.cassandra.keyspace=druid + +Use the `druid-development@googlegroups.com` mailing list if you have questions, +or feel free to reach out directly: `bone@alumni.brown.edu`. + + diff --git a/examples/cassandra/query2 b/examples/cassandra/query2 deleted file mode 100644 index c53a943f899..00000000000 --- a/examples/cassandra/query2 +++ /dev/null @@ -1,11 +0,0 @@ -{ - "queryType": "groupBy", - "dataSource": "appevents", - "granularity": "all", - "dimensions": ["appid", "event"], - "aggregations":[ - {"type":"count", "name":"eventcount"}, - {"type":"doubleSum", "fieldName":"events", "name":"eventssum"} - ], - "intervals":["2012-10-01T00:00/2020-01-01T00"] -} diff --git a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java index 10edff548d7..4cb267f571f 100644 --- a/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -45,11 +45,11 @@ public class RealtimeStandaloneMain ); // Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage - //rn.setSegmentPublisher(new NoopSegmentPublisher()); - //rn.setAnnouncer(new NoopDataSegmentAnnouncer()); - //rn.setDataSegmentPusher(new NoopDataSegmentPusher()); - //rn.setServerView(new NoopServerView()); - //rn.setInventoryView(new NoopInventoryView()); + rn.setSegmentPublisher(new NoopSegmentPublisher()); + rn.setAnnouncer(new NoopDataSegmentAnnouncer()); + rn.setDataSegmentPusher(new NoopDataSegmentPusher()); + rn.setServerView(new NoopServerView()); + rn.setInventoryView(new NoopInventoryView()); Runtime.getRuntime().addShutdownHook( new Thread( From 3eb0f4dfcab1f89c6a829a3a136efa43607c54ed Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Wed, 8 May 2013 15:50:58 -0400 Subject: [PATCH 08/12] Fix hierarchy in Pusher (to use CassandraStorage superclass) --- .../cassandra/CassandraDataSegmentPusher.java | 40 ++----------------- .../loading/cassandra/CassandraStorage.java | 9 +++-- 2 files changed, 8 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java index 7bd1f897347..202ace8994f 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -13,64 +13,30 @@ import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusherUtil; import com.metamx.druid.utils.CompressionUtils; -import com.netflix.astyanax.AstyanaxContext; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.NodeDiscoveryType; -import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; -import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; -import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; import com.netflix.astyanax.model.ColumnFamily; -import com.netflix.astyanax.recipes.storage.CassandraChunkedStorageProvider; import com.netflix.astyanax.recipes.storage.ChunkedStorage; import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; -import com.netflix.astyanax.serializers.StringSerializer; -import com.netflix.astyanax.thrift.ThriftFamilyFactory; /** * Cassandra Segment Pusher * * @author boneill42 */ -// TODO: Auto-create the schema if it does not exist. -// Should we make it so they can specify tables? -public class CassandraDataSegmentPusher implements DataSegmentPusher +public class CassandraDataSegmentPusher extends CassandraStorage implements DataSegmentPusher { private static final Logger log = new Logger(CassandraDataSegmentPusher.class); - private static final String CLUSTER_NAME = "druid_cassandra_cluster"; - private static final String INDEX_TABLE_NAME = "index_storage"; - private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; private static final int CONCURRENCY = 10; private static final Joiner JOINER = Joiner.on("/").skipNulls(); - private final CassandraDataSegmentConfig config; private final ObjectMapper jsonMapper; - private Keyspace keyspace; - private AstyanaxContext astyanaxContext; - private ChunkedStorageProvider indexStorage; - private ColumnFamily descriptorStorage; - public CassandraDataSegmentPusher( CassandraDataSegmentConfig config, ObjectMapper jsonMapper) { - this.config = config; - this.jsonMapper = jsonMapper; - this.astyanaxContext = new AstyanaxContext.Builder() - .forCluster(CLUSTER_NAME) - .forKeyspace(config.getKeyspace()) - .withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.NONE)) - .withConnectionPoolConfiguration( - new ConnectionPoolConfigurationImpl("MyConnectionPool").setMaxConnsPerHost(10) - .setSeeds(config.getHost())).withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) - .buildKeyspace(ThriftFamilyFactory.getInstance()); - this.astyanaxContext.start(); - this.keyspace = this.astyanaxContext.getEntity(); - - descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, - StringSerializer.get(), StringSerializer.get()); - - indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); + super(config); + this.jsonMapper=jsonMapper; } @Override diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java index e770af69c59..e9d04609135 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraStorage.java @@ -46,9 +46,10 @@ public class CassandraStorage private static final String DESCRIPTOR_TABLE_NAME = "descriptor_storage"; private AstyanaxContext astyanaxContext; - Keyspace keyspace; - ChunkedStorageProvider indexStorage; - ColumnFamily descriptorStorage; + final Keyspace keyspace; + final ChunkedStorageProvider indexStorage; + final ColumnFamily descriptorStorage; + final CassandraDataSegmentConfig config; public CassandraStorage(CassandraDataSegmentConfig config) { @@ -62,7 +63,7 @@ public class CassandraStorage .buildKeyspace(ThriftFamilyFactory.getInstance()); this.astyanaxContext.start(); this.keyspace = this.astyanaxContext.getEntity(); - + this.config = config; indexStorage = new CassandraChunkedStorageProvider(keyspace, INDEX_TABLE_NAME); descriptorStorage = new ColumnFamily(DESCRIPTOR_TABLE_NAME, From 8e0c0e6d47c1e45270ba9e5357094270b4c1e28d Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Wed, 8 May 2013 16:02:11 -0400 Subject: [PATCH 09/12] Switched to default the aws properties. --- .../druid/initialization/ServerInit.java | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 4c848c88d68..0ac5e67a4de 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -21,7 +21,6 @@ package com.metamx.druid.initialization; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; @@ -92,33 +91,23 @@ public class ServerInit DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader(); final QueryableIndexFactory factory = new MMappedQueryableIndexFactory(); - SingleSegmentLoader s3segmentLoader= null; - try { - final RestS3Service s3Client = new RestS3Service( - new AWSCredentials( - PropUtils.getProperty(props, "com.metamx.aws.accessKey"), - PropUtils.getProperty(props, "com.metamx.aws.secretKey") - ) - ); - final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client); - s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config); - } catch (com.metamx.common.ISE ise){ - log.warn("Could not create s3Client.", ise); - } - - Map loaderTypes = new HashMap(); - loaderTypes.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)); - loaderTypes.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)); - loaderTypes.put("c*", - new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config)); - - if (s3segmentLoader != null){ - loaderTypes.put("s3", s3segmentLoader); - loaderTypes.put("s3_zip", s3segmentLoader); - } + final RestS3Service s3Client = new RestS3Service( + new AWSCredentials( + props.getProperty("com.metamx.aws.accessKey", ""), + props.getProperty("com.metamx.aws.secretKey", "") + ) + ); + final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client); + final SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config); delegateLoader.setLoaderTypes( - ImmutableMap.copyOf(loaderTypes) + ImmutableMap.builder() + .put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config)) + .put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config)) + .put("s3", s3segmentLoader) + .put("s3_zip", s3segmentLoader) + .put("c*",new SingleSegmentLoader(new CassandraDataSegmentPuller(configFactory.build(CassandraDataSegmentConfig.class)), factory, config)) + .build() ); return delegateLoader; } From 23998f3f01229984c079a230c173758e1ad9cf89 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Thu, 16 May 2013 13:04:46 -0400 Subject: [PATCH 10/12] - Added cleanup to the puller. - Edited the documentation to remove reference to real-time node. --- examples/cassandra/README.md | 2 +- .../cassandra/CassandraDataSegmentPuller.java | 37 +++++++++++-------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/examples/cassandra/README.md b/examples/cassandra/README.md index 8f34c805d00..addc35d9ba7 100644 --- a/examples/cassandra/README.md +++ b/examples/cassandra/README.md @@ -2,7 +2,7 @@ Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables: `index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains -compressed segments for distribution to real-time and compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread +compressed segments for distribution to compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that stores the segment metadatak. diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java index 823fa46ce24..3a0277a65df 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -22,6 +22,8 @@ package com.metamx.druid.loading.cassandra; import java.io.File; import java.io.OutputStream; +import org.apache.commons.io.FileUtils; + import com.google.common.base.Throwables; import com.google.common.io.Files; import com.metamx.common.ISE; @@ -41,13 +43,13 @@ import com.netflix.astyanax.recipes.storage.ObjectMetadata; * * @author boneill42 */ -public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller +public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller { private static final Logger log = new Logger(CassandraDataSegmentPuller.class); private static final int CONCURRENCY = 10; private static final int BATCH_SIZE = 10; - public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) + public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) { super(config); } @@ -56,7 +58,6 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException { String key = (String) segment.getLoadSpec().get("key"); - log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); if (!outDir.exists()) @@ -71,22 +72,28 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data long startTime = System.currentTimeMillis(); ObjectMetadata meta = null; + final File outFile = new File(outDir, "index.zip"); try { - final File outFile = new File(outDir, "index.zip"); - log.info("Writing to [" + outFile.getAbsolutePath() + "]"); - OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); - meta = ChunkedStorage - .newReader(indexStorage, key, os) - .withBatchSize(BATCH_SIZE) - .withConcurrencyLevel(CONCURRENCY) - .call(); - os.close(); - CompressionUtils.unzip(outFile, outDir); + try + { + log.info("Writing to [" + outFile.getAbsolutePath() + "]"); + OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); + meta = ChunkedStorage + .newReader(indexStorage, key, os) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); + os.close(); + CompressionUtils.unzip(outFile, outDir); + } catch (Exception e) + { + FileUtils.deleteDirectory(outDir); + } } catch (Exception e) { - throw Throwables.propagate(e); - } + throw new SegmentLoadingException(e, e.getMessage()); + } log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime, meta.getObjectSize()); } From 61c014c49fd9862446445772146b8d54b4c66cd6 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Thu, 16 May 2013 13:33:39 -0400 Subject: [PATCH 11/12] Cleaned up imports. --- .../druid/loading/cassandra/CassandraDataSegmentPuller.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java index 3a0277a65df..01af22c096b 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -24,7 +24,6 @@ import java.io.OutputStream; import org.apache.commons.io.FileUtils; -import com.google.common.base.Throwables; import com.google.common.io.Files; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; From c8ff5ca2fbaf00d7b0f586ec3cf438b4c5200740 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Fri, 17 May 2013 12:54:03 -0400 Subject: [PATCH 12/12] Using string formatter in log statements. Clean up of imports on the pusher. --- .../druid/loading/cassandra/CassandraDataSegmentPuller.java | 4 ++-- .../druid/loading/cassandra/CassandraDataSegmentPusher.java | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java index 01af22c096b..87e6105f161 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPuller.java @@ -76,7 +76,7 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data { try { - log.info("Writing to [" + outFile.getAbsolutePath() + "]"); + log.info("Writing to [%s]", outFile.getAbsolutePath()); OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput(); meta = ChunkedStorage .newReader(indexStorage, key, os) @@ -109,7 +109,7 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data .execute(); ColumnList children = result.getResult(); long lastModified = children.getColumnByName("lastmodified").getLongValue(); - log.info("Read lastModified for [" + key + "] as [" + lastModified + "]"); + log.info("Read lastModified for [%s] as [%d]", key, lastModified); return lastModified; } catch (ConnectionException e) { diff --git a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java index 202ace8994f..57bc72b9124 100644 --- a/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/cassandra/CassandraDataSegmentPusher.java @@ -13,11 +13,8 @@ import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusherUtil; import com.metamx.druid.utils.CompressionUtils; -import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.recipes.storage.ChunkedStorage; -import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; /** * Cassandra Segment Pusher