Skip to content

HIVE-28913: When calling the reconnect() method in HiveMetaStoreClient, if HADOOP_PROXY_USER is set, a delegation token must be issued for the proxy user. #5775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2025

Conversation

koodin9
Copy link
Contributor

@koodin9 koodin9 commented Apr 19, 2025

What changes were proposed in this pull request?

Added logic within the reconnect() method to obtain a Delegation Token for the proxy user if the HADOOP_PROXY_USER environment variable is set.

Why are the changes needed?

When the Hive Metastore is restarted, the reconnect() method of the HiveMetaStoreClient included in the Iceberg API fails (when HADOOP_PROXY_USER is configured without this change).

Does this PR introduce any user-facing change?

Yes, this affects users who specifically set the HADOOP_PROXY_USER environment variable. (The behavior upon HMS restart will change from failure to success for them).

How was this patch tested?

Testing was conducted using Kafka Connect with the Iceberg Sink Connector to save data to an Iceberg table.
During the test, while write operations were in progress, the connected Hive Metastore was restarted.

@koodin9 koodin9 changed the title When calling the reconnect() method in HiveMetaStoreClient, if HADOOP_PROXY_USER is set, a delegation token must be issued for the proxy user. HIVE-28913: When calling the reconnect() method in HiveMetaStoreClient, if HADOOP_PROXY_USER is set, a delegation token must be issued for the proxy user. Apr 21, 2025
@koodin9
Copy link
Contributor Author

koodin9 commented Apr 22, 2025

@prasanthj
Would you mind taking a look at this pull request? thanks

@@ -502,6 +506,9 @@ public void reconnect() throws MetaException {
// connection has died and the default connection is likely to be the first array element.
promoteRandomMetaStoreURI();
}

generateProxyUserDelegationToken();
Copy link
Member

@deniskuzZ deniskuzZ Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we'll be requesting a new delegation token on every reconnect() attempt. ATM delegation token is created in HiveMetaStoreClient constructor.
What exception is being thrown? Do we need a new delegation token?

Copy link
Contributor Author

@koodin9 koodin9 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deniskuzZ
The error stack trace below was generated from Hive version 2.3.x. However, it appears the same issue likely exists in the master branch as well, so I've created this Pull Request.

If the HiveMetaStore is restarted while a service is running with the HADOOP_PROXY_USER environment variable configured, the following error occurs.

[2025-04-19 15:45:48,446] INFO [45337-limtan-ib-g19-3|task-0] Trying to connect to metastore with URI thrift://koodin-test-metastore-1.com:9083 (hive.metastore:410)
[2025-04-19 15:45:48,452] ERROR [45337-limtan-ib-g19-3|task-0] SASL negotiation failure (org.apache.thrift.transport.TSaslTransport:278)
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
	at jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(Unknown Source)
	at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:96)
	at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:236)
	at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:39)
	at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
	at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
	at java.base/java.security.AccessController.doPrivileged(Unknown Source)
	at java.base/javax.security.auth.Subject.doAs(Unknown Source)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:494)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.reconnect(HiveMetaStoreClient.java:341)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:163)
	at jdk.proxy4/jdk.proxy4.$Proxy188.getTable(Unknown Source)
	at org.apache.iceberg.hive.HiveTableOperations.lambda$doRefresh$0(HiveTableOperations.java:146)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
	at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
	at org.apache.iceberg.BaseTable.refresh(BaseTable.java:73)
	at io.tabular.iceberg.connect.channel.Worker.write(Worker.java:133)
	at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:51)
	at io.tabular.iceberg.connect.IcebergSinkTask.lambda$put$4(IcebergSinkTask.java:181)
	at java.base/java.security.AccessController.doPrivileged(Unknown Source)
	at java.base/javax.security.auth.Subject.doAs(Unknown Source)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:179)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
	at java.security.jgss/sun.security.jgss.krb5.Krb5InitCredential.getInstance(Unknown Source)
	at java.security.jgss/sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Unknown Source)
	at java.security.jgss/sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Unknown Source)
	at java.security.jgss/sun.security.jgss.GSSManagerImpl.getMechanismContext(Unknown Source)
	at java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(Unknown Source)
	at java.security.jgss/sun.security.jgss.GSSContextImpl.initSecContext(Unknown Source)
	... 40 more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we request a new token only when the current one is invalid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1c4b095

@deniskuzZ
I modified the code to retrieve the tokens from the proxy user's UserGroupInformation (via getCurrentUser()) and return if a HIVE_DELEGATION_TOKEN exists.
However, I'm not sure how best to implement the part that fetches the HIVE_DELEGATION_TOKEN and checks if it's valid.

Copy link
Member

@deniskuzZ deniskuzZ Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koodin9, you can get a token like

String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE);
tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);

do you know if renew helps here?

if (tokenStrForm != null) {
  renewDelegationToken(tokenStrForm);
}

I could only imagine that the issue is caused by the expired TGT and not the HMS service restart per se

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it's reasonable to get a new token every time we reconnect, as this is the same cost as connecting to HMS and check that token(and a call to get a new token if expired then).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deniskuzZ
It seems the process of generating a new token is definitely necessary.
When the HiveMetaStoreClient constructor created the Delegation Token for the proxy user, the result of calling UserGroupInformation.getCurrentUser().getTokens() correctly contained the token, as expected.

[2025-04-24 14:01:33,413] INFO [45337-limtan-ib-g19-3|task-0] finish creating DT. current User(koodin-9 (auth:PROXY) via [email protected] (auth:KERBEROS)) getTokens: [Kind: HIVE_DELEGATION_TOKEN, Service: DelegationTokenForHiveMetaStoreServer, Ident: ...] (hive.metastore:334)

However, when checking by calling UserGroupInformation.getCurrentUser().getTokens() at the point reconnect() was invoked, it always showed an empty value.
In the 3f5a258, logging was added to the SecurityUtils.getTokenStrForm function, and when it was called, the results were as follows.

    public static String getTokenStrForm(String tokenSignature) throws IOException {
        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
        TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();

        LOG.info("ok, ugi getTokens? : {}", ugi.getTokens());
        LOG.info("ok, ugi realUser getTokens? : {}", ugi.getRealUser().getTokens());

        Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
                tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
        return token != null ? token.encodeToUrlString() : null;
    }
[2025-04-24 14:32:51,577] INFO [45337-limtan-ib-g19-3|task-2] ok, ugi getTokens? : [] (org.apache.hadoop.hive.metastore.utils.SecurityUtils:159)
[2025-04-24 14:32:51,578] INFO [45337-limtan-ib-g19-3|task-2] ok, ugi realUser getTokens? : [] (org.apache.hadoop.hive.metastore.utils.SecurityUtils:160)

I am not sure why the token information in UserGroupInformation.getCurrentUser().getTokens() disappears after the Hive Metastore restarts. Would you happen to know the reason for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea, that would require some debugging. However, if UserGroupInformation.getCurrentUser().getTokens(). is empty, creating a new token seems to be the only option

SecurityUtils.setTokenStr(UserGroupInformation.getCurrentUser(), delegationTokenStr,
delegationTokenPropString);
MetastoreConf.setVar(this.conf, ConfVars.TOKEN_SIGNATURE, delegationTokenPropString);
close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this close to finally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c1d5ce3
Thank you for reviewing it.

Copy link
Member

@deniskuzZ deniskuzZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, pending tests

Copy link

@dengzhhu653 dengzhhu653 merged commit 063cfa3 into apache:master Apr 28, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants