Skip to content

feat: Adding support for python 3.13 #1666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 47 commits into from
Apr 25, 2025
Merged

feat: Adding support for python 3.13 #1666

merged 47 commits into from
Apr 25, 2025

Conversation

gavin-aguiar
Copy link
Contributor

@gavin-aguiar gavin-aguiar commented Apr 11, 2025

Description

Proxy Worker Changes (merge to dev)

  • Adds 3.13 to the build & test pipelines
    • For python 3.12, the builds include azure_functions_worker ONLY
    • For python 3.13+, the builds include proxy_worker ONLY
    • We reuse the existing build / testing structure from the current worker, but we make some logical changes to test afw or proxy accordingly based on python version
  • Proxy_worker: receives & returns gRPC messages from the host
    • Majority of the worker functionality is defined in the library worker. The proxy worker will solely receive messages from the host, forward them to the library worker, and respond back.
    • Both the V1 and V2 library worker are included in the image. The proxy worker decides what to import based on the function path.

Fixes #


PR information

  • The title of the PR is clear and informative.
  • There are a small number of commits, each of which has an informative message. This means that previously merged commits do not appear in the history of the PR. For information on cleaning up the commits in your pull request, see this page.
  • If applicable, the PR references the bug/issue that it fixes in the description.
  • New Unit tests were added for the changes made and CI is passing.

Quality of Code and Contribution Guidelines

gavin-aguiar and others added 25 commits February 5, 2025 15:41
* build: recognize collection_model_binding_data for batch inputs (#1655)

* add cmbd

* Add

* Add

* Rm newline

* Add tests

* Fix cmbd

* Fix test

* Lint

* Rm

* Rm

* Add back newline

* rm ws

* Rm list

* Rm cmbd from cache

* Avoid caching

* Keep cmbd check

* Add comment

* Lint

---------

Co-authored-by: Evan Roman <[email protected]>
Co-authored-by: hallvictoria <[email protected]>

* build: update Python Worker Version to 4.36.1 (#1660)

Co-authored-by: AzureFunctionsPython <[email protected]>

* initial changes

* Update Python SDK Version to 1.23.0 (#1663)

Co-authored-by: AzureFunctionsPython <[email protected]>

* merges from ADO

* merge fixes

* merge fixes

* merge fixes

* merge fixes

* don't run 313 unit tests yet

* changes for builds

---------

Co-authored-by: Evan <[email protected]>
Co-authored-by: Evan Roman <[email protected]>
Co-authored-by: AzureFunctionsPython <[email protected]>
Co-authored-by: AzureFunctionsPython <[email protected]>
@unittest.skipIf(sys.version_info.minor < 13, "For python 3.13+,"
"this logic is in the"
"library worker.")
@unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+,"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a constant to defined the supported python version to minimize the changes in the future.

# Extract minor version as integers
PY_MINOR="${PY_VER#*.}"

if [ "$PY_MINOR" -ge 13 ] then

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Consider having a file that contains the supported version of python and used at both ci-unit-tests and ci-e2e-tests.yml file.

@unittest.skipIf(sys.version_info.minor < 13, "For python 3.13+,"
"this logic is in the"
"library worker.")
@unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+,"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checking] You are skipping tests for 3.13 is that intended?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why any change in this?

@@ -62,6 +65,9 @@ jobs:
Python312V4:
pythonVersion: '3.12'
workerPath: 'python/prodV4/worker.py'
Python313V4:
pythonVersion: '3.13'
workerPath: 'python/proxyV4/worker.py'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the worker Path be a variable at the top?

@@ -63,6 +63,15 @@ jobs:
SQL_CONNECTION: $(LinuxSqlConnectionString312)
EVENTGRID_URI: $(LinuxEventGridTopicUriString312)
EVENTGRID_CONNECTION: $(LinuxEventGridConnectionKeyString312)
Python313:
PYTHON_VERSION: '3.13'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E2E tests are re-using 312 resources for now until the appropriate 313 resources are created (cosmosdb, sql, eventgrid).

For this - is there a helper script to smoothen the process of creation? If not, please add it in the 3.13 backlog.

@@ -100,6 +106,9 @@ jobs:
Python312V4:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long run - We should simplify these blocks - the yaml is unnecessarily long.

@@ -129,7 +145,6 @@ jobs:
Write-Host "##vso[task.setvariable variable=skipTest;]false"
}
displayName: 'Set skipTest variable'
condition: or(eq(variables.isSdkRelease, true), eq(variables['USETESTPYTHONSDK'], true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did this do?

cp -r azure_functions_worker/protos "$BUILD_SOURCESDIRECTORY/deps/azure_functions_worker"
else
cp -r proxy_worker/protos "$BUILD_SOURCESDIRECTORY/deps/proxy_worker"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the long run - the name shouldn't change - can it be done in this iteration itself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this seems unnecessary.

UNIX_SHARED_MEMORY_DIRECTORIES,
)
from azure_functions_worker.utils.common import get_app_setting, is_envvar_true
if sys.version_info.minor < 13:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like this :( Better to create a 3.13 specific util for now.

@@ -43,7 +43,8 @@ def test_flake8(self):

try:
subprocess.run(
[sys.executable, '-m', 'flake8', '--config', str(config_path)],
[sys.executable, '-m', 'flake8', '--config', str(config_path),
'azure_functions_worker',],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not "flake" other folders?

@@ -16,4 +15,10 @@ def add_script_root_to_sys_path():

if __name__ == '__main__':
add_script_root_to_sys_path()
main.main()
minor_version = sys.version_info[1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wanted to get out of the whole minor version thing - Please create a different test folder if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a separate worker.py?

# third-party user packages over worker packages in PYTHONPATH
user_pkg_paths = determine_user_pkg_paths()
joined_pkg_paths = os.pathsep.join(user_pkg_paths)
env['PYTHONPATH'] = f'{joined_pkg_paths}:{func_worker_dir}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a documentation update

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

VERSION = "4.36.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason it should be in a python file or would it be in the pyproject.toml itself

@@ -6,8 +6,9 @@
import azure.functions as func
from tests.utils import testutils

from azure_functions_worker import protos
from azure_functions_worker.bindings import datumdef, meta
if sys.version_info.minor < 13:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still sad :(

logger.info("Args: %s", args)
logger.info('Starting proxy worker.')
logger.info('Worker ID: %s, Request ID: %s, Host Address: %s:%s',
args.worker_id, args.request_id, args.host, args.port)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to 1 or debug logs

return getattr(_invocation_id_local, 'invocation_id', None)


class AsyncLoggingHandler(logging.Handler):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exploration for later - to move to logging

return int(max_workers) if max_workers else None

async def _handle__worker_init_request(self, request):
logger.info('Received WorkerInitRequest, '
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new logs added are intended to help confirm that the library worker is receiving requests appropriately.

No new info logs - make them debug. During cold start, adding a log should be fine but post Evan's validation.


global _library_worker
directory = request.worker_init_request.function_app_directory
v2_directory = os.path.join(directory, get_script_file_name())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename - make it explicit that its a function_app entry file for v2 model

v2_directory = os.path.join(directory, get_script_file_name())
if os.path.exists(v2_directory):
try:
import azure_functions_worker_v2 # NoQA
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explore to import this in a separate variable above.

x = importlib.import (azure_functions_worker_v2)

x.handle_dispatch_init in your code.

Decide if you want to run import lib again based on a return DependencyManager.prioritize_customer_dependencies(directory) if it tells you that Cx did bring in a runtime in reqs.txt


def on_logging(self, record: logging.LogRecord,
formatted_msg: str) -> None:
if record.levelno >= logging.CRITICAL:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a lot uses an if-elif chain to map log levels, consider having a map defined to clean the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be switch case itself?

async def _dispatch_grpc_request(self, request):
content_type = request.WhichOneof("content")

match content_type:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of if-else, use a dictionary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So something like

handlers = handlers["content_type"]

And handlers is a dict [<content_type:str>, <func_point:addr>]

self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE)
self._grpc_thread.join()
self._grpc_thread = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocker] You are joining thread and cleanup, there could be exception from these which will left resources in open(or undefined state). I will encourage use to exception and finally to ensure thread safety.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which will left resources in open(or undefined state)

This is already at the end of the workers lifetime - and we can discard any state.

try:
forever = self._loop.create_future()

self._grpc_resp_queue.put_nowait(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checking, unaware of python data structures] If multiple thread attempt to put in the queue is that thread safe. We use something called BlockingQueue or ConcurrentQueue in Java


def stop(self) -> None:
if self._grpc_thread is not None:
self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you should consider having lock, if two thread access this code simultaneously, it will lead to race cndition.

try:
for req in grpc_req_stream:
self._loop.call_soon_threadsafe(
self._loop.create_task, self._dispatch_grpc_request(req))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checking] If _dispatch_grpc_request itself is thread safe? You might need to synchronize the _dispatch_grpc_request

if logger and handler:
handler.flush()
logger.removeHandler(handler)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Checking] Should you need close handler?

Comment on lines +51 to +54
logger.info("Args: %s", args)
logger.info(
'Starting proxy worker. Worker ID: %s, Request ID: %s, Host Address: %s:%s',
args.worker_id, args.request_id, args.host, args.port)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging these logs

cls._add_to_sys_path(cls.cx_deps_path, True)
cls._add_to_sys_path(working_directory, False)

logger.info(f'Finished prioritize_customer_dependencies: {sys.path}')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please convert to debug (when GA)

Comment on lines +147 to +149
cls._add_to_sys_path(cls.worker_deps_path, True)
cls._add_to_sys_path(cls.cx_deps_path, True)
cls._add_to_sys_path(working_directory, False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can adding to first or last be more explicit - readable

logger.info(f'Finished prioritize_customer_dependencies: {sys.path}')

@classmethod
def _add_to_sys_path(cls, path: str, add_to_first: bool):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move private methods to be earlier than public methods.

Comment on lines +214 to +231
@staticmethod
def _get_cx_deps_path() -> str:
"""Get the directory storing the customer's third-party libraries.

Returns
-------
str
Core Tools: path to customer's site packages
Linux Dedicated/Premium: path to customer's site packages
Linux Consumption: empty string
"""
prefix: Optional[str] = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT)
cx_paths: List[str] = [
p for p in sys.path
if prefix and p.startswith(prefix) and ('site-packages' in p)
]
# Return first or default of customer path
return (cx_paths or [''])[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is "AZURE_WEBJOBS_SCRIPT_ROOT" showing up here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First - explore using the dir from grpc request itself. or use cls.Cx_working_directory

The worker packages path
"""
# 1. Try to parse the absolute path python/3.13/LINUX/X64 in sys.path
r = re.compile(r'.*python(\/|\\)\d+\.\d+(\/|\\)(WINDOWS|LINUX|OSX).*')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Regex should be on the top as Constant

# Don't reload proxy_worker
to_be_cleared_from_cache = set([
module_name for module_name in not_builtin
if not module_name.startswith('proxy_worker')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name - shouldn't be hardcoded. Should be moved to Constant.


# App Setting constants
PYTHON_ENABLE_DEBUG_LOGGING = "PYTHON_ENABLE_DEBUG_LOGGING"
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proxy worker shouldn't create the TP. Thus this constant shouldn't be here.

Comment on lines +138 to +140
self._sync_call_tp: Optional[concurrent.futures.Executor] = (
self._create_sync_call_tp(self._get_sync_tp_max_workers()))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explore - moving this logic into lib_worker and not to put in proxy worker.

@classmethod
async def connect(cls, host: str, port: int, worker_id: str,
request_id: str, connect_timeout: float):
loop = asyncio.events.get_event_loop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a task - app setting for enabling "uvloop" - add this feature.

uvloop.new_event_loop vs asyncio.events.get_event_loop()

async def _dispatch_grpc_request(self, request):
content_type = request.WhichOneof("content")

match content_type:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So something like

handlers = handlers["content_type"]

And handlers is a dict [<content_type:str>, <func_point:addr>]

self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE)
self._grpc_thread.join()
self._grpc_thread = None

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which will left resources in open(or undefined state)

This is already at the end of the workers lifetime - and we can discard any state.

Copy link
Member

@vrdmr vrdmr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to go in;

follow up tasks to be created and minor tests.

@gavin-aguiar gavin-aguiar merged commit c3899d8 into dev Apr 25, 2025
24 of 31 checks passed
@gavin-aguiar gavin-aguiar deleted the gaaguiar/proxy_worker branch April 25, 2025 18:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants