NIFI-3273: Added nifi-toolkit-flowfile-repo that contains a simple Java class that is capable of recovering a FlowFile Repository manually in the case of an operating system crash that results in trailing 0's being dumped into the edit logs. Also refactored flowfile repo into some independent modules so that it additional capabilities can be added in the future to examine the flowfile repo

This commit is contained in:
Mark Payne 2017-03-22 13:08:30 -04:00 committed by joewitt
parent 141334c3c9
commit 0207f21ce4
27 changed files with 983 additions and 96 deletions

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-flowfile-repo-serialization</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-repository-models</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-write-ahead-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SchemaRepositoryRecordSerdeTest {
public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier";
private StandardResourceClaimManager resourceClaimManager;
private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde;
private Map<String, FlowFileQueue> queueMap;
private FlowFileQueue flowFileQueue;
private ByteArrayOutputStream byteArrayOutputStream;
private DataOutputStream dataOutputStream;
@Before
public void setup() {
resourceClaimManager = new StandardResourceClaimManager();
schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager);
queueMap = new HashMap<>();
schemaRepositoryRecordSerde.setQueueMap(queueMap);
flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER);
byteArrayOutputStream = new ByteArrayOutputStream();
dataOutputStream = new DataOutputStream(byteArrayOutputStream);
}
@After
public void teardown() {
resourceClaimManager.purge();
}
@Test
public void testV1CreateCantHandleLongAttributeName() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV1CreateCantHandleLongAttributeValue() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2CreateCanHandleLongAttributeName() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2CreateCanHandleLongAttributeValue() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testRoundTripCreateV1ToV2() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
attributes.put("testName", "testValue");
schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV1SwapInCantHandleLongAttributeName() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV1SwapInCantHandleLongAttributeValue() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2SwapInCanHandleLongAttributeName() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put(stringBuilder.toString(), "testValue");
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testV2SwapInCanHandleLongAttributeValue() throws IOException {
schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 65536; i++) {
stringBuilder.append('a');
}
attributes.put("testName", stringBuilder.toString());
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
}
@Test
public void testRoundTripSwapInV1ToV2() throws IOException {
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
Map<String, String> attributes = new HashMap<>();
attributes.put("testName", "testValue");
StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
record.setSwapLocation("fake");
assertEquals(SWAP_IN, record.getType());
schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
DataInputStream dataInputStream = createDataInputStream();
schemaRepositoryRecordSerde.readHeader(dataInputStream);
RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
assertEquals(SWAP_IN, repositoryRecord.getType());
}
private DataInputStream createDataInputStream() throws IOException {
dataOutputStream.flush();
return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
}
private StandardRepositoryRecord createCreateFlowFileRecord(Map<String, String> attributes) {
StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue);
StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder();
flowFileRecordBuilder.addAttributes(attributes);
standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
return standardRepositoryRecord;
}
private FlowFileQueue createMockQueue(String identifier) {
FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn(identifier);
queueMap.put(identifier, flowFileQueue);
return flowFileQueue;
}
}

View File

@ -55,6 +55,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-repository-models</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
@ -135,6 +139,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-write-ahead-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-repo-serialization</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-repository-models</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -29,6 +29,8 @@
<module>nifi-runtime</module>
<module>nifi-security</module>
<module>nifi-site-to-site</module>
<module>nifi-repository-models</module>
<module>nifi-flowfile-repo-serialization</module>
<module>nifi-framework-core</module>
<module>nifi-framework-cluster-protocol</module>
<module>nifi-framework-cluster</module>

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-toolkit-flowfile-repo</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.repos.flowfile;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
public class RepairCorruptedFileEndings {
private static final Pattern PARTITION_FILE_PATTERN = Pattern.compile("partition\\-\\d+");
private static void printUsage() {
System.out.println("Whenever a sudden power loss occurs, it is common with some operating systems for files that are being written to ");
System.out.println("to contain many NUL characters (hex 0) at the end of the file upon restart. If this happens to the FlowFile repository, ");
System.out.println("NiFi will be unable to recover, because it cannot properly read the repository. This utility attempts to read the FlowFile ");
System.out.println("Repository and write out a new copy of the repository, where the new copy does not contain the trailing NUL characters so ");
System.out.println("NiFi can be restarted by pointing at the new FlowFile Repository.");
System.out.println("Typically, this problem can be identified by seeing an error in the NiFi logs at startup, indicating either:");
System.out.println();
System.out.println("Caused by: java.io.IOException: Expected to read a Sentinel Byte of '1' but got a value of '0' instead");
System.out.println();
System.out.println("or:");
System.out.println();
System.out.println("Caused by: java.lang.IllegalArgumentException: No enum constant org.wali.UpdateType.");
System.out.println();
System.out.println();
System.out.println("Usage:");
System.out.println("java " + RepairCorruptedFileEndings.class.getCanonicalName() + " <repo input directory> <repo destination directory>");
System.out.println();
System.out.println("<repo input directory>: The existing FlowFile Repository Directory that contains corrupt data");
System.out.println("<repo destination directory>: The directory to write the repaired repository to");
System.out.println();
}
public static void main(final String[] args) {
if (args.length != 2) {
printUsage();
return;
}
final File inputDir = new File(args[0]);
if (!inputDir.exists()) {
System.out.println("Input Repository Directory " + inputDir + " does not exist");
return;
}
final File[] inputFiles = inputDir.listFiles();
if (inputFiles == null) {
System.out.println("Could not access files within input Repository Directory " + inputDir);
return;
}
final List<File> partitionDirs = Stream.of(inputFiles)
.filter(RepairCorruptedFileEndings::isPartitionDirectory)
.collect(Collectors.toList());
if (partitionDirs.isEmpty()) {
System.out.println("Found no partitions within input Repository Directory " + inputDir);
return;
}
final File outputDir = new File(args[1]);
if (outputDir.exists()) {
final File[] children = outputDir.listFiles();
if (children == null) {
System.out.println("Cannot access output Repository Directory " + outputDir);
return;
}
if (children.length > 0) {
System.out.println("Output Repository Directory " + outputDir + " already exists and has files or sub-directories. "
+ "The output directory must either not exist or be empty.");
return;
}
} else if (!outputDir.mkdirs()) {
System.out.println("Failed to create output Repository Directory " + outputDir);
return;
}
final List<File> nonPartitionDirFiles = Stream.of(inputFiles)
.filter(f -> !isPartitionDirectory(f))
.filter(f -> !f.getName().equals("wali.lock"))
.collect(Collectors.toList());
for (final File nonPartitionFile : nonPartitionDirFiles) {
final File destination = new File(outputDir, nonPartitionFile.getName());
try {
copy(nonPartitionFile, destination);
} catch (final IOException e) {
System.out.println("Failed to copy source file " + nonPartitionFile + " to destination file " + destination);
e.printStackTrace();
}
}
int fullCopies = 0;
int partialCopies = 0;
for (final File partitionDir : partitionDirs) {
final File[] partitionFiles = partitionDir.listFiles();
if (partitionFiles == null) {
System.out.println("Could not access children of input sub-directory " + partitionDir);
return;
}
final File outputPartitionDir = new File(outputDir, partitionDir.getName());
if (!outputPartitionDir.mkdirs()) {
System.out.println("Failed to created output directory " + outputPartitionDir);
return;
}
for (final File partitionFile : partitionFiles) {
final File destinationFile = new File(outputPartitionDir, partitionFile.getName());
// All journal files follow the pattern of:
// <journal entry> <TRANSACTION_CONTINUE | TRANSACTION_COMMIT> <journal entry> <TRANSACTION_CONTINUE | TRANSACTION_COMMIT> ...
// The TRANSACTION_CONTINUE byte is a 1 while the TRANSACTION_COMMIT byte is a 2. So if we have 0's at the end then we know
// that we can simply truncate up until the point where we encounter the first of the of the trailing zeroes. At that point,
// we know that we are done. It is possible that the repo will still be 'corrupt' in that only part of a transaction was
// written out. However, this is okay because the repo will recover from this on restart. What it does NOT properly recover
// from on restart is when the file ends with a bunch of 0's because it believes that the Transaction ID is zero and then
// it reads in 0 bytes for the "Update Type" and as a result we get an invalid enum name because it thinks that the name of
// the UpdateType is an empty string because it's a string of length 0.
final int trailingZeroes;
try {
trailingZeroes = countTrailingZeroes(partitionFile);
} catch (final Exception e) {
System.out.println("Failed to read input file " + partitionFile);
e.printStackTrace();
return;
}
if (trailingZeroes > 0) {
final long goodLength = partitionFile.length() - trailingZeroes;
try {
copy(partitionFile, destinationFile, goodLength);
partialCopies++;
} catch (final Exception e) {
System.out.println("Failed to copy " + goodLength + " bytes from " + partitionFile + " to " + destinationFile);
e.printStackTrace();
return;
}
} else {
try {
copy(partitionFile, destinationFile);
} catch (final Exception e) {
System.out.println("Failed to copy entire file from " + partitionFile + " to " + destinationFile);
e.printStackTrace();
return;
}
fullCopies++;
}
}
}
System.out.println("Successfully copied " + fullCopies + " journal files fully and truncated " + partialCopies + " journal files in output directory");
}
private static boolean isPartitionDirectory(final File file) {
return PARTITION_FILE_PATTERN.matcher(file.getName()).matches();
}
private static void copy(final File input, final File destination) throws IOException {
if (input.isFile()) {
copyFile(input, destination);
return;
} else {
copyDirectory(input, destination);
}
}
private static void copyDirectory(final File input, final File destination) throws IOException {
if (!destination.exists() && !destination.mkdirs()) {
System.out.println("Failed to copy input directory " + input + " to destination because destination directory " + destination
+ " does not exist and could not be created");
return;
}
final File[] children = input.listFiles();
if (children == null) {
System.out.println("Failed to copy input directory " + input + " to destination because could not access files of input directory");
return;
}
for (final File child : children) {
final File destinationChild = new File(destination, child.getName());
copy(child, destinationChild);
}
}
private static void copyFile(final File input, final File destination) throws IOException {
if (!input.exists()) {
return;
}
Files.copy(input.toPath(), destination.toPath(), StandardCopyOption.COPY_ATTRIBUTES);
}
private static void copy(final File input, final File destination, final long length) throws IOException {
try (final InputStream fis = new FileInputStream(input);
final LimitingInputStream in = new LimitingInputStream(fis, length);
final OutputStream fos = new FileOutputStream(destination)) {
StreamUtils.copy(in, fos);
}
}
static int countTrailingZeroes(final File partitionFile) throws IOException {
final RandomAccessFile raf = new RandomAccessFile(partitionFile, "r");
long startPos = partitionFile.length() - 4096;
int count = 0;
boolean reachedStartOfFile = false;
while (!reachedStartOfFile) {
int bufferLength = 4096;
if (startPos < 0) {
bufferLength = (int) (startPos + 4096);
startPos = 0;
reachedStartOfFile = true;
}
raf.seek(startPos);
final byte[] buffer = new byte[bufferLength];
final int read = fillBuffer(raf, buffer);
for (int i = read - 1; i >= 0; i--) {
final byte b = buffer[i];
if (b == 0) {
count++;
} else {
return count;
}
}
startPos -= 4096;
}
return count;
}
private static int fillBuffer(final RandomAccessFile source, final byte[] destination) throws IOException {
int bytesRead = 0;
int len;
while (bytesRead < destination.length) {
len = source.read(destination, bytesRead, destination.length - bytesRead);
if (len < 0) {
break;
}
bytesRead += len;
}
return bytesRead;
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.repos.flowfile;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import org.junit.Test;
public class TestRepairCorruptedFileEndings {
private final File targetFile = new File("target/1.bin");
@Before
@After
public void cleanup() {
if (targetFile.exists()) {
Assert.assertTrue(targetFile.delete());
}
}
@Test
public void testEndsWithZeroesGreaterThanBufferSize() throws IOException {
final byte[] data = new byte[4096 + 8];
for (int i=0; i < 4096; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(8, zeroCount);
}
@Test
public void testEndsWithZeroesSmallerThanBufferSize() throws IOException {
final byte[] data = new byte[1024];
for (int i = 0; i < 1020; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(4, zeroCount);
}
@Test
public void testEndsWithZeroesEqualToBufferSize() throws IOException {
final byte[] data = new byte[4096];
for (int i = 0; i < 4090; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(6, zeroCount);
}
@Test
public void testAllZeroesGreaterThanBufferSize() throws IOException {
final byte[] data = new byte[4096 + 8];
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(4096 + 8, zeroCount);
}
@Test
public void testAllZeroesEqualToBufferSize() throws IOException {
final byte[] data = new byte[4096];
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(4096, zeroCount);
}
@Test
public void testAllZeroesSmallerThanBufferSize() throws IOException {
final byte[] data = new byte[1024];
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(1024, zeroCount);
}
@Test
public void testSmallerThanBufferSize() throws IOException {
final byte[] data = new byte[1024];
for (int i = 0; i < 1020; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(4, zeroCount);
}
@Test
public void testSmallerThanBufferSizeNoTrailingZeroes() throws IOException {
final byte[] data = new byte[1024];
for (int i = 0; i < 1024; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(0, zeroCount);
}
@Test
public void testLargerThanBufferSizeNoTrailingZeroes() throws IOException {
final byte[] data = new byte[8192];
for (int i = 0; i < 8192; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(0, zeroCount);
}
@Test
public void testEqualToBufferSizeNoTrailingZeroes() throws IOException {
final byte[] data = new byte[4096];
for (int i = 0; i < 4096; i++) {
data[i] = 'A';
}
Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile);
assertEquals(0, zeroCount);
}
}

View File

@ -27,6 +27,7 @@
<module>nifi-toolkit-encrypt-config</module>
<module>nifi-toolkit-s2s</module>
<module>nifi-toolkit-zookeeper-migrator</module>
<module>nifi-toolkit-flowfile-repo</module>
<module>nifi-toolkit-assembly</module>
</modules>
<dependencyManagement>

193
pom.xml
View File

@ -1,15 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache</groupId>
@ -87,7 +88,7 @@ language governing permissions and limitations under the License. -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.min-version>3.1.0</maven.min-version>
<maven.surefire.arguments/>
<maven.surefire.arguments />
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<inceptionYear>2014</inceptionYear>
@ -330,9 +331,9 @@ language governing permissions and limitations under the License. -->
<artifactId>quartz</artifactId>
<version>2.2.1</version>
<exclusions>
<!-- | Exclude the quartz 2.2.1 bundled version of c3p0 because it is
lgpl licensed | We also don't use the JDBC related features of quartz for
which the dependency would matter -->
<!-- | Exclude the quartz 2.2.1 bundled version of c3p0
because it is lgpl licensed | We also don't use the JDBC related features
of quartz for which the dependency would matter -->
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
@ -401,8 +402,8 @@ language governing permissions and limitations under the License. -->
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
<exclusions>
<!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies
section -->
<!-- <artifactId>jcl-over-slf4j</artifactId> is used
in dependencies section -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
@ -870,6 +871,16 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-expression-language</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-repo-serialization</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-repository-models</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-custom-ui-utilities</artifactId>
@ -1600,7 +1611,9 @@ language governing permissions and limitations under the License. -->
<include>**/*Spec.class</include>
</includes>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine combine.children="append">-Xmx1G -Djava.net.preferIPv4Stack=true ${maven.surefire.arguments}</argLine>
<argLine combine.children="append">-Xmx1G
-Djava.net.preferIPv4Stack=true
${maven.surefire.arguments}</argLine>
</configuration>
<dependencies>
<dependency>
@ -1804,71 +1817,100 @@ language governing permissions and limitations under the License. -->
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
<module name="FileTabCharacter">
<property name="eachLine" value="true" />
<property name="eachLine"
value="true" />
</module>
<module name="TreeWalker">
<module name="RegexpSinglelineJava">
<property name="format" value="\s+$" />
<property name="message" value="Line has trailing whitespace." />
<property name="format"
value="\s+$" />
<property name="message"
value="Line has trailing whitespace." />
</module>
<module name="RegexpSinglelineJava">
<property name="format" value="[@]see\s+[{][@]link" />
<property name="message" value="Javadoc @see does not need @link: pick one or the other." />
<property name="format"
value="[@]see\s+[{][@]link" />
<property name="message"
value="Javadoc @see does not need @link: pick one or the other." />
</module>
<module name="OuterTypeFilename" />
<module name="LineLength">
<!-- needs extra, because Eclipse formatter ignores the ending left
brace -->
<!-- needs extra, because Eclipse formatter
ignores the ending left brace -->
<property name="max" value="200" />
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" />
<property name="ignorePattern"
value="^package.*|^import.*|a href|href|http://|https://|ftp://" />
</module>
<module name="AvoidStarImport" />
<module name="UnusedImports">
<property name="processJavadoc" value="true" />
<property name="processJavadoc"
value="true" />
</module>
<module name="NoLineWrap" />
<module name="LeftCurly">
<property name="maxLineLength" value="160" />
<property name="maxLineLength"
value="160" />
</module>
<module name="RightCurly" />
<module name="RightCurly">
<property name="option" value="alone" />
<property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" />
<property name="option"
value="alone" />
<property name="tokens"
value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" />
</module>
<module name="SeparatorWrap">
<property name="tokens" value="DOT" />
<property name="option" value="nl" />
<property name="tokens"
value="DOT" />
<property name="option"
value="nl" />
</module>
<module name="SeparatorWrap">
<property name="tokens" value="COMMA" />
<property name="option" value="EOL" />
<property name="tokens"
value="COMMA" />
<property name="option"
value="EOL" />
</module>
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
<property name="format"
value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
</module>
<module name="MethodTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
<property name="format"
value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
</module>
<module name="MethodParamPad" />
<module name="OperatorWrap">
<property name="option" value="NL" />
<property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
<property name="option"
value="NL" />
<property name="tokens"
value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
</module>
<module name="AnnotationLocation">
<property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
<property name="tokens"
value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
</module>
<module name="AnnotationLocation">
<property name="tokens" value="VARIABLE_DEF" />
<property name="allowSamelineMultipleAnnotations" value="true" />
<property name="tokens"
value="VARIABLE_DEF" />
<property
name="allowSamelineMultipleAnnotations"
value="true" />
</module>
<module name="NonEmptyAtclauseDescription" />
<module name="JavadocMethod">
<property name="allowMissingJavadoc" value="true" />
<property name="allowMissingParamTags" value="true" />
<property name="allowMissingThrowsTags" value="true" />
<property name="allowMissingReturnTag" value="true" />
<property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" />
<property name="allowThrowsTagsForSubclasses" value="true" />
<property name="allowMissingJavadoc"
value="true" />
<property name="allowMissingParamTags"
value="true" />
<property name="allowMissingThrowsTags"
value="true" />
<property name="allowMissingReturnTag"
value="true" />
<property name="allowedAnnotations"
value="Override,Test,BeforeClass,AfterClass,Before,After" />
<property
name="allowThrowsTagsForSubclasses"
value="true" />
</module>
<module name="SingleLineJavadoc" />
</module>
@ -1908,9 +1950,10 @@ language governing permissions and limitations under the License. -->
</build>
<profiles>
<profile>
<!-- Performs execution of Integration Tests using the Maven FailSafe Plugin. The view of integration tests in this context
are those tests interfacing with external sources and services requiring additional resources or credentials that cannot
be explicitly provided. -->
<!-- Performs execution of Integration Tests using the Maven
FailSafe Plugin. The view of integration tests in this context are those
tests interfacing with external sources and services requiring additional
resources or credentials that cannot be explicitly provided. -->
<id>integration-tests</id>
<build>
<plugins>
@ -1930,11 +1973,11 @@ language governing permissions and limitations under the License. -->
</build>
</profile>
<profile>
<!-- Checks style and licensing requirements. This is a good idea to run
for contributions and for the release process. While it would be nice to
run always these plugins can considerably slow the build and have proven
to create unstable builds in our multi-module project and when building using
multiple threads. The stability issues seen with Checkstyle in multi-module
<!-- Checks style and licensing requirements. This is a good
idea to run for contributions and for the release process. While it would
be nice to run always these plugins can considerably slow the build and have
proven to create unstable builds in our multi-module project and when building
using multiple threads. The stability issues seen with Checkstyle in multi-module
builds include false-positives and false negatives. -->
<id>contrib-check</id>
<build>
@ -1991,14 +2034,16 @@ language governing permissions and limitations under the License. -->
</pluginManagement>
</build>
</profile>
<!-- The following profiles are here as a convenience for folks that want to build against vendor-specific
distributions of the various Hadoop ecosystem libraries. These will alter which dependencies are sourced
in a manner that can adjust the correct LICENSE and NOTICE requirements for any affected jar and the
resulting assembly overall. These L&N impacts are not automatically handled by the build process and are
the responsibility of those creating and using the resulting binary artifacts. -->
<!-- The following profiles are here as a convenience for folks that
want to build against vendor-specific distributions of the various Hadoop
ecosystem libraries. These will alter which dependencies are sourced in a
manner that can adjust the correct LICENSE and NOTICE requirements for any
affected jar and the resulting assembly overall. These L&N impacts are not
automatically handled by the build process and are the responsibility of
those creating and using the resulting binary artifacts. -->
<profile>
<!-- This profile adds the Hortonworks repository for resolving Hortonworks Data Platform (HDP)
artifacts for the Hadoop bundles -->
<!-- This profile adds the Hortonworks repository for resolving
Hortonworks Data Platform (HDP) artifacts for the Hadoop bundles -->
<id>hortonworks</id>
<repositories>
<repository>
@ -2033,15 +2078,13 @@ language governing permissions and limitations under the License. -->
</repository>
</repositories>
<properties>
<!-- Vendor-specific version number included here as default, should be overridden on the
command-line
<hadoop.version>2.7.1.2.4.0.0-169</hadoop.version>
-->
<!-- Vendor-specific version number included here as default,
should be overridden on the command-line <hadoop.version>2.7.1.2.4.0.0-169</hadoop.version> -->
</properties>
</profile>
<profile>
<!-- This profile will add the MapR repository for resolving MapR Hadoop
artifacts for the Hadoop bundles -->
<!-- This profile will add the MapR repository for resolving
MapR Hadoop artifacts for the Hadoop bundles -->
<id>mapr</id>
<repositories>
<repository>
@ -2057,15 +2100,13 @@ language governing permissions and limitations under the License. -->
</repository>
</repositories>
<properties>
<!-- Vendor-specific version number included here as default, should be overridden on the
command-line
<hadoop.version>2.7.0-mapr-1602</hadoop.version>
-->
<!-- Vendor-specific version number included here as default,
should be overridden on the command-line <hadoop.version>2.7.0-mapr-1602</hadoop.version> -->
</properties>
</profile>
<profile>
<!-- This profile will add the Cloudera repository for resolving Cloudera Distribution of Hadoop (CDH)
artifacts for the Hadoop bundles -->
<!-- This profile will add the Cloudera repository for resolving
Cloudera Distribution of Hadoop (CDH) artifacts for the Hadoop bundles -->
<id>cloudera</id>
<repositories>
<repository>
@ -2081,10 +2122,8 @@ language governing permissions and limitations under the License. -->
</repository>
</repositories>
<properties>
<!-- Vendor-specific version number included here as default, should be overridden on the
command-line
<hadoop.version>2.6.0-cdh5.8.1</hadoop.version>
-->
<!-- Vendor-specific version number included here as default,
should be overridden on the command-line <hadoop.version>2.6.0-cdh5.8.1</hadoop.version> -->
</properties>
</profile>
</profiles>