NIFI-6895: Fix PutKudu processor concurrency issues

Calls to `trigger()` may be called concurrently from different threads,
however the PutKudu processor is storing the `kuduSession`
in a class level field. This can result in the logging issue reported in
NIFI-6895 and likely other unusual anomolies including performace
issues depending on the processor configuration.

Additionally the `operationType` was also stored in a class level field
and could be set concurrently resulting in the incorrect operation type
used.

This patch fixes the issue by moving both kuduSession and operationType
to be local. Additionaly some minor code cleanup was included.

An integration test, ITPutKudu, was added and used to manual verify the
logging issue existed and is fixed by this patch. I ran the test using
`mvn -Pintegration-tests verify -Dtest=ITPutKudu`

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3910.
This commit is contained in:
Grant Henke 2019-12-02 11:41:18 -06:00 committed by Pierre Villard
parent 955784031f
commit 1898ad44be
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
5 changed files with 286 additions and 27 deletions

View File

@ -24,6 +24,32 @@
<artifactId>nifi-kudu-processors</artifactId>
<packaging>jar</packaging>
<properties>
<exclude.tests>None</exclude.tests>
<kudu.version>1.10.0</kudu.version>
</properties>
<build>
<extensions>
<!-- Used to find the right kudu-binary artifact with the Maven
property ${os.detected.classifier} -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>${exclude.tests}</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -46,7 +72,7 @@
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -63,6 +89,12 @@
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-test-utils</artifactId>
<version>${kudu.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
@ -81,5 +113,54 @@
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>kudu-windows</id>
<activation>
<os>
<family>Windows</family>
</os>
</activation>
<properties>
<!-- Kudu tests do not support Windows. -->
<exclude.tests>**/*.java</exclude.tests>
</properties>
</profile>
<profile>
<id>kudu-linux</id>
<activation>
<os>
<family>Unix</family>
</os>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>${kudu.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>kudu-mac</id>
<activation>
<os>
<family>mac</family>
</os>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>${kudu.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -121,8 +121,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
protected KuduClient buildClient(final String masters, final ProcessContext context) {
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout)

View File

@ -175,11 +175,10 @@ public class PutKudu extends AbstractKuduProcessor {
public static final String RECORD_COUNT_ATTR = "record.count";
protected OperationType operationType;
protected SessionConfiguration.FlushMode flushMode;
// Properties set in onScheduled.
protected int batchSize = 100;
protected int ffbatch = 1;
protected SessionConfiguration.FlushMode flushMode;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -208,9 +207,6 @@ public class PutKudu extends AbstractKuduProcessor {
return rels;
}
protected KerberosUser kerberosUser;
protected KuduSession kuduSession;
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, LoginException {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@ -225,9 +221,8 @@ public class PutKudu extends AbstractKuduProcessor {
if (flowFiles.isEmpty()) {
return;
}
kerberosUser = getKerberosUser();
final KerberosUser user = kerberosUser;
final KerberosUser user = getKerberosUser();
if (user == null) {
trigger(context, session, flowFiles);
return;
@ -246,28 +241,40 @@ public class PutKudu extends AbstractKuduProcessor {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final KuduClient kuduClient = getKuduClient();
kuduSession = getKuduSession(kuduClient);
final KuduSession kuduSession = createKuduSession(kuduClient);
final Map<FlowFile, Integer> numRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
int numBuffered = 0;
OperationType prevOperationType = OperationType.INSERT;
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final RecordSet recordSet = recordReader.createRecordSet();
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final KuduTable kuduTable = kuduClient.openTable(tableName);
// In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
// Because the session is shared across flow files, for batching efficiency, we
// need to flush when changing to and from INSERT_IGNORE operation types.
// This should be updated and simplified when KUDU-1563 is completed.
if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
flushKuduSession(kuduSession, false, pendingRowErrors);
kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
}
prevOperationType = operationType;
Record record = recordSet.next();
while (record != null) {
Operation operation = getKuduOperationType(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
Operation operation = createKuduOperation(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
@ -341,25 +348,20 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false);
}
protected KuduSession getKuduSession(final KuduClient client) {
protected KuduSession createKuduSession(final KuduClient client) {
final KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize);
kuduSession.setFlushMode(flushMode);
if (operationType == OperationType.INSERT_IGNORE) {
kuduSession.setIgnoreAllDuplicateRows(true);
}
return kuduSession;
}
private Operation getKuduOperationType(OperationType operationType, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields, KuduTable kuduTable) {
private Operation createKuduOperation(OperationType operationType, Record record,
List<String> fieldNames, Boolean ignoreNull,
Boolean lowercaseFields, KuduTable kuduTable) {
switch (operationType) {
case DELETE:
return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case INSERT:
return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case INSERT_IGNORE:
return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case UPSERT:

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.cluster.MiniKuduCluster;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
public class ITPutKudu {
public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
// The KuduTestHarness automatically starts and stops a real Kudu cluster
// when each test is run. Kudu persists its on-disk state in a temporary
// directory under a location defined by the environment variable TEST_TMPDIR
// if set, or under /tmp otherwise. That cluster data is deleted on
// successful exit of the test. The cluster output is logged through slf4j.
@Rule
public KuduTestHarness harness = new KuduTestHarness(
new MiniKuduCluster.MiniKuduClusterBuilder()
.addMasterServerFlag("--use_hybrid_clock=false")
.addTabletServerFlag("--use_hybrid_clock=false")
);
private TestRunner testRunner;
private PutKudu processor;
private MockRecordParser readerFactory;
@Before
public void setUp() throws Exception {
processor = new PutKudu();
testRunner = TestRunners.newTestRunner(processor);
createKuduTable();
setUpTestRunner(testRunner);
}
@After
public void tearDown() {
testRunner = null;
}
private void setUpTestRunner(TestRunner testRunner) {
testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
testRunner.setProperty(PutKudu.KUDU_MASTERS, harness.getMasterAddressesAsString());
testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, "false");
testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "false");
testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString());
}
private void createKuduTable() throws KuduException {
KuduClient client = harness.getClient();
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("stringVal", Type.STRING).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("num32Val", Type.INT32).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleVal", Type.DOUBLE).build());
Schema schema = new Schema(columns);
CreateTableOptions opts = new CreateTableOptions()
.addHashPartitions(Collections.singletonList("id"), 4);
client.createTable(DEFAULT_TABLE_NAME, schema, opts);
}
private void createRecordReader(int numOfRecord) throws InitializationException {
readerFactory = new MockRecordParser();
readerFactory.addSchemaField("id", RecordFieldType.INT);
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
for (int i = 0; i < numOfRecord; i++) {
readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
}
testRunner.addControllerService("mock-reader-factory", readerFactory);
testRunner.enableControllerService(readerFactory);
}
@Test
public void testWriteKudu() throws IOException, InitializationException {
final int recordCount = 100;
final int numFlowFiles = 5;
createRecordReader(recordCount);
final String filename = "testWriteKudu-" + System.currentTimeMillis();
final Map<String,String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
// Use values to ensure multiple batches and multiple flow files per-trigger
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString());
testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
// Increase the thread count to better simulate a production environment
testRunner.setThreadCount(4);
// Trigger the flow
IntStream.range(0, numFlowFiles).forEach(i ->
testRunner.enqueue("trigger", flowFileAttributes));
testRunner.run(numFlowFiles);
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, numFlowFiles);
// verify the successful flow file has the expected content & attributes
final MockFlowFile mockFlowFile =
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
mockFlowFile.assertContentEquals("trigger");
// verify we generated provenance events
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
Assert.assertEquals(numFlowFiles, provEvents.size());
// verify it was a SEND event with the correct URI
final ProvenanceEventRecord provEvent = provEvents.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
// Verify Kudu record count.
KuduClient client = harness.getClient();
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
int count = 0;
for (RowResult unused : scanner) {
count++;
}
Assert.assertEquals(recordCount, count);
}
}

View File

@ -162,7 +162,7 @@ public class MockPutKudu extends PutKudu {
}
@Override
protected KuduSession getKuduSession(KuduClient client) {
protected KuduSession createKuduSession(KuduClient client) {
return session;
}
}