Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14109 Refactored remaining processors and control services to be uniform when creating properties and relationships. #9600

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

private static final List<PropertyDescriptor> propertyDescriptors;

static {
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION);
}

protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
return propertyDescriptors;
}
protected static final List<PropertyDescriptor> PARENT_PROPERTIES = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the parent class 'getCommonPropertyDescriptors' method is removed and instead a subclass is supposed to know to use the static list of descriptors as they build their own lists. I think this weakens what the author intended in building that class.

Also there is at least one example much later on in this PR where you kept the parent method. I think keeping the parent method better conveys intent to any subclassers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will revert this back.


private BlockingQueue<AMQPResource<T>> resourceQueue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
Expand Down Expand Up @@ -163,28 +162,24 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
.build();

private static final List<PropertyDescriptor> propertyDescriptors;
private static final Set<Relationship> relationships;

private static final ObjectMapper objectMapper;

static {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
properties.add(PREFETCH_COUNT);
properties.add(HEADER_FORMAT);
properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
properties.add(REMOVE_CURLY_BRACES);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);

relationships = Set.of(REL_SUCCESS);

objectMapper = new ObjectMapper();
}
private static final List<PropertyDescriptor> PROPERTIES = Stream.concat(
Stream.of(
QUEUE,
AUTO_ACKNOWLEDGE,
BATCH_SIZE,
PREFETCH_COUNT,
HEADER_FORMAT,
HEADER_KEY_PREFIX,
HEADER_SEPARATOR,
REMOVE_CURLY_BRACES
), PARENT_PROPERTIES.stream()
).toList();

private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_SUCCESS
);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Will construct a {@link FlowFile} containing the body of the consumed AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
Expand Down Expand Up @@ -303,7 +298,7 @@ private static String convertMapToString(Map<String, Object> headers, String val
}

private static String convertMapToJSONString(Map<String, Object> headers) throws JsonProcessingException {
return objectMapper.writeValueAsString(headers);
return OBJECT_MAPPER.writeValueAsString(headers);
}

@Override
Expand All @@ -320,12 +315,12 @@ protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext contex

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}

public enum OutputHeaderFormat implements DescribedValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
HEADERS_PATTERN,
HEADER_SEPARATOR
),
getCommonPropertyDescriptors().stream()
PARENT_PROPERTIES.stream()
).toList();

private final static Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class StandardAsanaClientProviderService extends AbstractControllerServic
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

protected static final List<PropertyDescriptor> DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
PROP_ASANA_API_BASE_URL,
PROP_ASANA_PERSONAL_ACCESS_TOKEN,
PROP_ASANA_WORKSPACE_NAME
Expand All @@ -83,7 +83,7 @@ public class StandardAsanaClientProviderService extends AbstractControllerServic

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
return PROPERTIES;
}

@OnEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -110,6 +109,12 @@ STRATEGY_NAME_CSE_C, new ClientSideCEncryptionStrategy()
.defaultValue(RegionUtilV1.createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ENCRYPTION_STRATEGY,
ENCRYPTION_VALUE,
KMS_REGION
);

private String keyValue = "";
private String kmsRegion = "";
private S3EncryptionStrategy encryptionStrategy = new NoOpEncryptionStrategy();
Expand Down Expand Up @@ -189,11 +194,7 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ENCRYPTION_STRATEGY);
properties.add(ENCRYPTION_VALUE);
properties.add(KMS_REGION);
return Collections.unmodifiableList(properties);
return PROPERTIES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -137,7 +136,7 @@ public class AmazonGlueSchemaRegistry extends AbstractControllerService implemen

private static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS);

private static final List<PropertyDescriptor> PROPERTIES = new ArrayList<>(Arrays.asList(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_REGISTRY_NAME,
REGION,
COMMUNICATIONS_TIMEOUT,
Expand All @@ -146,7 +145,7 @@ public class AmazonGlueSchemaRegistry extends AbstractControllerService implemen
AWS_CREDENTIALS_PROVIDER_SERVICE,
PROXY_CONFIGURATION_SERVICE,
SSL_CONTEXT_SERVICE
));
);


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class StandardAzureCredentialsControllerService extends AbstractControlle
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
CREDENTIAL_CONFIGURATION_STRATEGY, MANAGED_IDENTITY_CLIENT_ID
CREDENTIAL_CONFIGURATION_STRATEGY,
MANAGED_IDENTITY_CLIENT_ID
);

private TokenCredential credentials;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ protected void createCosmosClient(final String uri, final String accessKey, fina
.buildClient();
}

static List<PropertyDescriptor> descriptors = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
AzureCosmosDBUtils.URI,
AzureCosmosDBUtils.DB_ACCESS_KEY,
AzureCosmosDBUtils.CONSISTENCY
);

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
return PROPERTIES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class StandardKustoIngestService extends AbstractControllerService implem
.addValidator(StandardValidators.URL_VALIDATOR)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
AUTHENTICATION_STRATEGY,
APPLICATION_CLIENT_ID,
APPLICATION_KEY,
Expand All @@ -127,7 +127,7 @@ public class StandardKustoIngestService extends AbstractControllerService implem

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return PROPERTIES;
}

@OnEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class StandardKustoQueryService extends AbstractControllerService impleme
.dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
CLUSTER_URI,
AUTHENTICATION_STRATEGY,
APPLICATION_CLIENT_ID,
Expand All @@ -95,7 +95,7 @@ public class StandardKustoQueryService extends AbstractControllerService impleme

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return PROPERTIES;
}

public static final Pair<String, String> NIFI_SOURCE = Pair.of("processor", "nifi-source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement
.required(false)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SERVICE_BUS_ENDPOINT,
EVENT_HUB_NAMESPACE,
EVENT_HUB_NAME,
Expand All @@ -143,7 +143,7 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return PROPERTIES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I look through here it is clearly inconsistent whether we use the term properties, descriptors, or property descriptors. However, we should be careful. These things we're messing with in all these files are PropertyDescriptor objects. They're not actually properties. If we're going to establish more consistency we should consider using the term DESCRIPTOR or PROPERTY_DESCRIPTOR but probably not simply PROPERTIES

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about PROPERTY_DESCRIPTORS (plural)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry I meant the plural form (what it was originally).

}

protected EventHubProducerClient createEventHubClient(final String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,23 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
.sensitive(true)
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_REGISTRY_URLS,
SSL_CONTEXT,
TIMEOUT,
CACHE_SIZE,
CACHE_EXPIRATION,
AUTHENTICATION_TYPE,
USERNAME,
PASSWORD
);

private volatile SchemaRegistryClient client;


@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SCHEMA_REGISTRY_URLS);
properties.add(SSL_CONTEXT);
properties.add(TIMEOUT);
properties.add(CACHE_SIZE);
properties.add(CACHE_EXPIRATION);
properties.add(AUTHENTICATION_TYPE);
properties.add(USERNAME);
properties.add(PASSWORD);
return properties;
return PROPERTIES;
}

private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.nifi.services.dropbox;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
Expand Down Expand Up @@ -77,16 +75,12 @@ public class StandardDropboxCredentialService extends AbstractControllerService
.required(true)
.build();

private static final List<PropertyDescriptor> PROPERTIES;

static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(APP_KEY);
props.add(APP_SECRET);
props.add(ACCESS_TOKEN);
props.add(REFRESH_TOKEN);
PROPERTIES = Collections.unmodifiableList(props);
}
private static final List<PropertyDescriptor> PROPERTIES = List.of(
APP_KEY,
APP_SECRET,
ACCESS_TOKEN,
REFRESH_TOKEN
);

private DropboxCredentialDetails credential;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,30 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im

private ObjectMapper mapper;

private static final List<PropertyDescriptor> properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
private static final List<PropertyDescriptor> PROPERTIES = List.of(
HTTP_HOSTS,
PATH_PREFIX,
AUTHORIZATION_SCHEME,
USERNAME,
PASSWORD,
API_KEY_ID,
API_KEY,
PROP_SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE,
CONNECT_TIMEOUT,
SOCKET_TIMEOUT,
CHARSET,
SUPPRESS_NULLS,
COMPRESSION,
SEND_META_HEADER,
STRICT_DEPRECATION,
NODE_SELECTOR,
SNIFF_CLUSTER_NODES,
SNIFFER_INTERVAL,
SNIFFER_REQUEST_TIMEOUT,
SNIFF_ON_FAILURE,
SNIFFER_FAILURE_DELAY
);

private RestClient client;

Expand All @@ -117,7 +137,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
Expand Down
Loading
Loading