Skip to content

Commit dfa5f52

Browse files
authored
feat: adding NanoTDF support (#5)
Adds support for NanoTDF: #3 Pending upstream [java-sdk Nano PR](opentdf/java-sdk#46)
1 parent 56e22ed commit dfa5f52

15 files changed

+664
-169
lines changed

README.md

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,43 +3,65 @@ Integration of the [OpenTDF Platform](https://github.com/opentdf/platform) into
33

44
Components:
55
* "Zero Trust Data Format" (ZTDF) Processors:
6-
* [ConvertToZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java): A NiFi processor that converts FlowFile content to TDF format. Does not currently support assertions
7-
* [ConvertFromZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java): A NiFi processor that converts TDF formatted FlowFile content to it's plaintext representation
6+
* [ConvertToZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToZTDF.java): A NiFi processor that converts FlowFile content to ZTDF format. Does not currently support assertions
7+
* [ConvertFromZTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java): A NiFi processor that converts ZTDF formatted FlowFile content to it's plaintext representation
8+
* NanoTDF Processors ([See NanoTDF Specification](https://github.com/opentdf/spec/tree/main/schema/nanotdf#readme)):
9+
* [ConvertToNanoTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertToNanoTDF.java): A NiFi processor that converts FlowFile content to NanoTDF format. Does not currently support assertions
10+
* [ConvertFromNanoTDF](./nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromNanoTDF.java): A NiFi processor that converts NanoTDF formatted FlowFile content to it's plaintext representation
11+
812
* Controller Services:
913
* [OpenTDFControllerService](./nifi-tdf-controller-services-api/src/main/java/io/opentdf/nifi/OpenTDFControllerService.java): A NiFi controller service providing OpenTDF Platform Configuration
1014

1115

12-
#### FlowChart: Generic Plaintext to ZTDF Nifi Flow
16+
#### FlowChart: Generic ZTDF Nifi Flows
1317

1418
```mermaid
1519
---
16-
title: Generic Plaintext to ZTDF NiFi Flow
20+
title: Generic ZTDF NiFi Flows
1721
---
1822
flowchart TD
19-
a[FlowFile: \nPlaintext content]
23+
a[Nifi Processor]
2024
b["`**UpdateAttribute**`" Add data policy attributes to FlowFile]
2125
c["`**ConvertToZTDF**`"]
2226
d["Process ZTDF"]
2327
e["Handle Error"]
24-
a -- success --> b
25-
b -- success --> c
28+
f[Nifi Processor]
29+
g["`**ConvertFromZTDF**`"]
30+
h[Process Plaintext]
31+
i[Handle Error]
32+
a -- success (content = PlainText) --> b
33+
b -- success (content = PlainText) --> c
2634
c -- success (content = ZTDF) --> d
2735
c -- failure --> e
36+
f -- success (content = ZTDF) --> g
37+
g -- success (content = PlainText) --> h
38+
g -- failure --> i
2839
```
2940

30-
#### FlowChart: Generic ZTDF to Plaintext Nifi Flow
41+
#### FlowChart: Generic NanoTDF NiFi Flows
3142
```mermaid
3243
---
33-
title: Generic ZTDF to Plaintext Nifi Flow
44+
title: Generic NanoTDF NiFi Flows
3445
---
3546
flowchart TD
36-
a[FlowFile: \nZTDF content]
37-
b["`**ConvertFromZTDF**`"]
38-
c["Process ZTDF"]
39-
d["Handle Error"]
40-
a -- success --> b
41-
b -- success (content = plaintext) --> c
42-
b -- failure --> d
47+
a[Nifi Processor]
48+
b["`**UpdateAttribute**`" Add data policy attributes to FlowFile]
49+
c["`**ConvertToNanoTDF**`"]
50+
d["Process NanoTDF"]
51+
e["Handle Error"]
52+
e2["Handle Max Size Error"]
53+
f[Nifi Processor]
54+
g["`**ConvertFromZTDF**`"]
55+
h[Process Plaintext]
56+
i[Handle Error]
57+
a -- success (content = Plaintext) --> b
58+
b -- success (content = Plaintext)--> c
59+
c -- success (content = NanoTDF) --> d
60+
c -- failure --> e
61+
c -- exceeds_size_limit --> e2
62+
f -- success (content = NanoTDF) --> g
63+
g -- success (content = Plaintext) --> h
64+
g -- failure --> i
4365
```
4466

4567
# Quick Start - Docker Compose

nifi-tdf-processors/src/main/java/io/opentdf/nifi/AbstractTDFProcessor.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.opentdf.nifi;
22

3+
import io.opentdf.platform.sdk.NanoTDF;
34
import io.opentdf.platform.sdk.SDK;
45
import io.opentdf.platform.sdk.SDKBuilder;
6+
import io.opentdf.platform.sdk.TDF;
57
import org.apache.commons.io.IOUtils;
68
import org.apache.nifi.components.PropertyDescriptor;
79
import org.apache.nifi.components.PropertyValue;
@@ -11,6 +13,7 @@
1113
import org.apache.nifi.processor.ProcessContext;
1214
import org.apache.nifi.processor.ProcessSession;
1315
import org.apache.nifi.processor.Relationship;
16+
import org.apache.nifi.processor.exception.ProcessException;
1417
import org.apache.nifi.processor.io.OutputStreamCallback;
1518
import org.apache.nifi.processor.util.StandardValidators;
1619
import org.apache.nifi.ssl.SSLContextService;
@@ -19,9 +22,7 @@
1922
import java.io.IOException;
2023
import java.io.InputStream;
2124
import java.io.OutputStream;
22-
import java.util.Arrays;
23-
import java.util.HashSet;
24-
import java.util.Set;
25+
import java.util.*;
2526

2627
/**
2728
* Common helper processor
@@ -133,4 +134,35 @@ byte[] readEntireFlowFile(FlowFile flowFile, ProcessSession processSession) {
133134
processSession.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
134135
return buffer;
135136
}
137+
138+
@Override
139+
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
140+
List<FlowFile> flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger());
141+
if (!flowFiles.isEmpty()) {
142+
processFlowFiles(processContext, processSession, flowFiles);
143+
}
144+
}
145+
146+
/**
147+
* Process the flow files pulled using pull size
148+
* @param processContext NiFi process context
149+
* @param processSession Nifi process session
150+
* @param flowFiles List of FlowFile from the process session up to pull size limit
151+
* @throws ProcessException Processing Exception
152+
*/
153+
abstract void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException;
154+
155+
TDF getTDF() {
156+
return new TDF();
157+
}
158+
159+
NanoTDF getNanoTDF(){
160+
return new NanoTDF();
161+
}
162+
163+
@Override
164+
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
165+
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
166+
}
167+
136168
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.opentdf.nifi;
2+
3+
import io.opentdf.platform.sdk.Config;
4+
import org.apache.nifi.components.PropertyDescriptor;
5+
import org.apache.nifi.expression.ExpressionLanguageScope;
6+
import org.apache.nifi.flowfile.FlowFile;
7+
import org.apache.nifi.processor.ProcessContext;
8+
import org.apache.nifi.processor.util.StandardValidators;
9+
10+
import java.util.Arrays;
11+
import java.util.Collections;
12+
import java.util.List;
13+
import java.util.Set;
14+
import java.util.stream.Collectors;
15+
16+
/**
17+
* Common utilities for a processor converting content to one of the TDF formats
18+
*/
19+
public abstract class AbstractToProcessor extends AbstractTDFProcessor{
20+
static final String KAS_URL_ATTRIBUTE = "kas_url";
21+
static final String TDF_ATTRIBUTE = "tdf_attribute";
22+
23+
public static final PropertyDescriptor KAS_URL = new org.apache.nifi.components.PropertyDescriptor.Builder()
24+
.name("KAS URL")
25+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
26+
.description("The KAS Url to use for encryption; this is a default if the kas_url attribute is not present in the flow file")
27+
.required(false)
28+
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
29+
.build();
30+
31+
@Override
32+
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
33+
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE, KAS_URL));
34+
}
35+
36+
/**{
37+
* Get the kas urls from a flowfile attribute or if none present fallback to processor configuration KAS URL;
38+
* format is a comma separated list
39+
* @param flowFile
40+
* @param processContext
41+
* @return
42+
* @throws Exception
43+
*/
44+
List<String> getKasUrl(FlowFile flowFile, ProcessContext processContext) throws Exception{
45+
String kasUrlAttribute = flowFile.getAttribute(KAS_URL_ATTRIBUTE);
46+
//check kas url
47+
if (!processContext.getProperty(KAS_URL).isSet() && kasUrlAttribute == null) {
48+
throw new Exception("no " + KAS_URL_ATTRIBUTE + " flowfile attribute and no default KAS URL configured");
49+
}
50+
String kasUrlValues = kasUrlAttribute != null ? kasUrlAttribute : getPropertyValue(processContext.getProperty(KAS_URL)).getValue();
51+
List<String> kasUrls = Arrays.stream(kasUrlValues.split(",")).filter(x->!x.isEmpty()).collect(Collectors.toList());
52+
if (kasUrlValues.isEmpty()){
53+
throw new Exception("no KAS Urls provided");
54+
}
55+
return kasUrls;
56+
}
57+
58+
List<Config.KASInfo> getKASInfoFromKASURLs(List<String> kasUrls){
59+
return kasUrls.stream().map(x->{ var ki = new Config.KASInfo(); ki.URL=x; return ki;}).collect(Collectors.toList());
60+
}
61+
62+
/**
63+
* Get data attributes on a FlowFile from attribute value
64+
* @param flowFile
65+
* @return
66+
* @throws Exception
67+
*/
68+
Set<String> getDataAttributes(FlowFile flowFile) throws Exception{
69+
Set<String> dataAttributes = Arrays.stream((flowFile.getAttribute(TDF_ATTRIBUTE) == null ? "" :
70+
flowFile.getAttribute(TDF_ATTRIBUTE)).split(",")).filter(x -> !x.isEmpty()).collect(Collectors.toSet());
71+
if (dataAttributes.isEmpty()) {
72+
throw new Exception("no data attributes provided via " + TDF_ATTRIBUTE + " flowfile attribute");
73+
}
74+
return dataAttributes;
75+
}
76+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.opentdf.nifi;
2+
3+
import io.opentdf.platform.sdk.SDK;
4+
import org.apache.nifi.annotation.documentation.CapabilityDescription;
5+
import org.apache.nifi.annotation.documentation.Tags;
6+
import org.apache.nifi.flowfile.FlowFile;
7+
import org.apache.nifi.processor.ProcessContext;
8+
import org.apache.nifi.processor.ProcessSession;
9+
import org.apache.nifi.processor.exception.ProcessException;
10+
11+
import java.io.IOException;
12+
import java.nio.ByteBuffer;
13+
import java.util.List;
14+
15+
@CapabilityDescription("Decrypts NanoTDF flow file content")
16+
@Tags({"NanoTDF", "OpenTDF", "Decrypt", "Data Centric Security"})
17+
public class ConvertFromNanoTDF extends AbstractTDFProcessor {
18+
19+
@Override
20+
public void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
21+
SDK sdk = getTDFSDK(processContext);
22+
for (FlowFile flowFile : flowFiles) {
23+
try {
24+
byte[] nanoTDFBytes = readEntireFlowFile(flowFile, processSession);
25+
FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
26+
try {
27+
getNanoTDF().readNanoTDF(ByteBuffer.wrap(nanoTDFBytes), outputStream, sdk.getServices().kas());
28+
} catch (Exception e) {
29+
getLogger().error("error decrypting NanoTDF", e);
30+
throw new IOException(e);
31+
}
32+
});
33+
processSession.transfer(updatedFlowFile, REL_SUCCESS);
34+
} catch (Exception e) {
35+
getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
36+
processSession.transfer(flowFile, REL_FAILURE);
37+
}
38+
}
39+
}
40+
}

nifi-tdf-processors/src/main/java/io/opentdf/nifi/ConvertFromZTDF.java

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.nifi.processor.ProcessContext;
1111
import org.apache.nifi.processor.ProcessSession;
1212
import org.apache.nifi.processor.exception.ProcessException;
13+
import org.apache.nifi.stream.io.StreamUtils;
1314

1415
import java.io.IOException;
1516
import java.nio.channels.SeekableByteChannel;
@@ -23,38 +24,26 @@
2324
public class ConvertFromZTDF extends AbstractTDFProcessor {
2425

2526
@Override
26-
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
27-
return Collections.unmodifiableList(Arrays.asList(SSL_CONTEXT_SERVICE, OPENTDF_CONFIG_SERVICE, FLOWFILE_PULL_SIZE));
28-
}
29-
30-
31-
@Override
32-
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
33-
List<FlowFile> flowFiles = processSession.get(processContext.getProperty(FLOWFILE_PULL_SIZE).asInteger());
34-
if (!flowFiles.isEmpty()) {
35-
SDK sdk = getTDFSDK(processContext);
36-
for (FlowFile flowFile : flowFiles) {
37-
try {
38-
try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) {
39-
FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
40-
try {
41-
getTDF().loadTDF(seekableByteChannel, outputStream, sdk.getServices().kas());
42-
} catch (Exception e) {
43-
getLogger().error("error decrypting ZTDF", e);
44-
throw new IOException(e);
45-
}
46-
});
47-
processSession.transfer(updatedFlowFile, REL_SUCCESS);
48-
}
49-
} catch (Exception e) {
50-
getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
51-
processSession.transfer(flowFile, REL_FAILURE);
27+
void processFlowFiles(ProcessContext processContext, ProcessSession processSession, List<FlowFile> flowFiles) throws ProcessException {
28+
SDK sdk = getTDFSDK(processContext);
29+
for (FlowFile flowFile : flowFiles) {
30+
try {
31+
try (SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readEntireFlowFile(flowFile, processSession))) {
32+
FlowFile updatedFlowFile = processSession.write(flowFile, outputStream -> {
33+
try {
34+
TDF.Reader reader = getTDF().loadTDF(seekableByteChannel, sdk.getServices().kas());
35+
reader.readPayload(outputStream);
36+
} catch (Exception e) {
37+
getLogger().error("error decrypting ZTDF", e);
38+
throw new IOException(e);
39+
}
40+
});
41+
processSession.transfer(updatedFlowFile, REL_SUCCESS);
5242
}
43+
} catch (Exception e) {
44+
getLogger().error(flowFile.getId() + ": error decrypting flowfile", e);
45+
processSession.transfer(flowFile, REL_FAILURE);
5346
}
5447
}
5548
}
56-
57-
TDF getTDF() {
58-
return new TDF();
59-
}
6049
}

0 commit comments

Comments
 (0)