Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit b832e3a

Browse files
author
gavingaozhangmin
committed
sql support pulsar.client configuration
1 parent fbb8d17 commit b832e3a

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import org.apache.pulsar.client.api.PulsarClientException;
2525
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
2626
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
27+
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
28+
import org.apache.pulsar.shade.com.google.common.collect.Maps;
2729
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
2830

31+
import java.util.Map;
2932
import java.util.Properties;
33+
import java.util.stream.Collectors;
3034

3135
/** Utility to create Pulsar Admin Client from adminUrl and clientConfigurationData. */
3236
public class PulsarClientUtils {
@@ -56,13 +60,18 @@ private static Authentication getAuth(ClientConfigurationData conf)
5660
}
5761

5862
public static ClientConfigurationData newClientConf(String serviceUrl, Properties properties) {
63+
Map<String, Object> clientConfData = getClientParams(Maps.fromProperties(properties));
5964
ClientConfigurationData clientConf = new ClientConfigurationData();
65+
clientConf = ConfigurationDataUtils.loadData(clientConfData, clientConf, ClientConfigurationData.class);
6066
clientConf.setServiceUrl(serviceUrl);
61-
if (properties != null) {
62-
clientConf.setAuthParams(properties.getProperty(PulsarOptions.AUTH_PARAMS_KEY));
63-
clientConf.setAuthPluginClassName(
64-
properties.getProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY));
65-
}
6667
return clientConf;
6768
}
69+
70+
public static Map<String, Object> getClientParams(Map<String, String> parameters) {
71+
return parameters.keySet().stream()
72+
.filter(k -> k.startsWith(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX))
73+
.collect(Collectors.toMap(
74+
k -> k.substring(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX.length()),
75+
k -> parameters.get(k)));
76+
}
6877
}

0 commit comments

Comments
 (0)