- Added cleanup to the puller.

- Edited the documentation to remove reference to real-time node.
This commit is contained in:
Brian O'Neill 2013-05-16 13:04:46 -04:00
parent 8e0c0e6d47
commit 23998f3f01
2 changed files with 23 additions and 16 deletions

View File

@ -2,7 +2,7 @@
Druid can use Cassandra as a deep storage mechanism. Segments and their metadata are stored in Cassandra in two tables:
`index_storage` and `descriptor_storage`. Underneath the hood, the Cassandra integration leverages Astyanax. The
index storage table is a [Chunked Object](https://github.com/Netflix/astyanax/wiki/Chunked-Object-Store) repository. It contains
compressed segments for distribution to real-time and compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread
compressed segments for distribution to compute nodes. Since segments can be large, the Chunked Object storage allows the integration to multi-thread
the write to Cassandra, and spreads the data across all the nodes in a cluster. The descriptor storage table is a normal C* table that
stores the segment metadatak.

View File

@ -22,6 +22,8 @@ package com.metamx.druid.loading.cassandra;
import java.io.File;
import java.io.OutputStream;
import org.apache.commons.io.FileUtils;
import com.google.common.base.Throwables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
@ -41,13 +43,13 @@ import com.netflix.astyanax.recipes.storage.ObjectMetadata;
*
* @author boneill42
*/
public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller
public class CassandraDataSegmentPuller extends CassandraStorage implements DataSegmentPuller
{
private static final Logger log = new Logger(CassandraDataSegmentPuller.class);
private static final int CONCURRENCY = 10;
private static final int BATCH_SIZE = 10;
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
{
super(config);
}
@ -56,7 +58,6 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
if (!outDir.exists())
@ -71,22 +72,28 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data
long startTime = System.currentTimeMillis();
ObjectMetadata meta = null;
final File outFile = new File(outDir, "index.zip");
try
{
final File outFile = new File(outDir, "index.zip");
log.info("Writing to [" + outFile.getAbsolutePath() + "]");
OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput();
meta = ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
os.close();
CompressionUtils.unzip(outFile, outDir);
try
{
log.info("Writing to [" + outFile.getAbsolutePath() + "]");
OutputStream os = Files.newOutputStreamSupplier(outFile).getOutput();
meta = ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
os.close();
CompressionUtils.unzip(outFile, outDir);
} catch (Exception e)
{
FileUtils.deleteDirectory(outDir);
}
} catch (Exception e)
{
throw Throwables.propagate(e);
}
throw new SegmentLoadingException(e, e.getMessage());
}
log.info("Pull of file[%s] completed in %,d millis (%s bytes)", key, System.currentTimeMillis() - startTime,
meta.getObjectSize());
}