Skip to content

fix: remaining CodeQL fixes #1612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dev = [
"flask",
"fastapi~=0.103.2",
"pydantic",
"pycryptodome==3.*",
"flake8==5.*; python_version == '3.7'",
"flake8==6.*; python_version >= '3.8'",
"mypy",
Expand All @@ -69,7 +68,8 @@ dev = [
"pandas",
"numpy",
"pre-commit",
"invoke"
"invoke",
"cryptography"
]
test-http-v2 = [
"azurefunctions-extensions-http-fastapi",
Expand Down
14 changes: 8 additions & 6 deletions tests/unittests/test_http_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,18 +415,20 @@ def test_response_cookie_header_nullable_double_err(self):
self.assertFalse("Set-Cookie" in r.headers)

def check_log_print_to_console_stdout(self, host_out: typing.List[str]):
# System logs stdout should not exist in host_out
self.assertNotIn('Secret42', host_out)
# System logs stdout should exist in host_out
self.assertIn('Secret42', host_out)

@skipIf(sys.version_info < (3, 9, 0),
"Skip the tests for Python 3.8 and below")
def test_print_to_console_stderr(self):
r = self.webhost.request('GET', 'print_logging?console=true'
'&message=Secret42&is_stderr=true')
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, 'OK-print-logging')

def check_log_print_to_console_stderr(self, host_out: typing.List[str], ):
# System logs stderr should not exist in host_out
self.assertNotIn('Secret42', host_out)
# System logs stderr should exist in host_out
self.assertIn('Secret42', host_out)

def test_hijack_current_event_loop(self):
r = self.webhost.request('GET', 'hijack_current_event_loop/')
Expand All @@ -443,8 +445,8 @@ def check_log_hijack_current_event_loop(self, host_out: typing.List[str]):
self.assertIn('parallelly_log_custom at custom_logger', host_out)
self.assertIn('callsoon_log', host_out)

# System logs should not exist in host_out
self.assertNotIn('parallelly_log_system at disguised_logger', host_out)
# System logs should exist in host_out
self.assertIn('parallelly_log_system at disguised_logger', host_out)

@skipIf(sys.version_info.minor < 11,
"The context param is only available for 3.11+")
Expand Down
10 changes: 6 additions & 4 deletions tests/unittests/test_http_functions_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,17 @@ def test_response_cookie_header_nullable_bool_err(self):
self.assertEqual(r.status_code, 200)
self.assertTrue("Set-Cookie" in r.headers)

@skipIf(sys.version_info < (3, 9, 0),
"Skip the tests for Python 3.8 and below")
def test_print_to_console_stderr(self):
r = self.webhost.request('GET', 'print_logging?console=true'
'&message=Secret42&is_stderr=true')
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, '"OK-print-logging"')

def check_log_print_to_console_stderr(self, host_out: typing.List[str], ):
# System logs stderr should not exist in host_out
self.assertNotIn('Secret42', host_out)
# System logs stderr now exist in host_out
self.assertIn('Secret42', host_out)

def test_hijack_current_event_loop(self):
r = self.webhost.request('GET', 'hijack_current_event_loop/')
Expand All @@ -417,8 +419,8 @@ def check_log_hijack_current_event_loop(self, host_out: typing.List[str]):
self.assertIn('parallelly_log_custom at custom_logger', host_out)
self.assertIn('callsoon_log', host_out)

# System logs should not exist in host_out
self.assertNotIn('parallelly_log_system at disguised_logger', host_out)
# System logs now exist in host_out
self.assertIn('parallelly_log_system at disguised_logger', host_out)

def test_no_type_hint(self):
r = self.webhost.request('GET', 'no_type_hint')
Expand Down
16 changes: 8 additions & 8 deletions tests/unittests/test_log_filtering_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ def test_info_with_sdk_logging(self):

def check_log_info_with_sdk_logging(self, host_out: typing.List[str]):
# See TestLogFilteringFunctions docstring
# System log should not be captured in console
self.assertNotIn('sdk_logger info', host_out)
self.assertNotIn('sdk_logger warning', host_out)
self.assertNotIn('sdk_logger error', host_out)
# System log should be captured in console
self.assertIn('sdk_logger info', host_out)
self.assertIn('sdk_logger warning', host_out)
self.assertIn('sdk_logger error', host_out)
self.assertNotIn('sdk_logger debug', host_out)

def test_info_with_sdk_submodule_logging(self):
Expand All @@ -101,8 +101,8 @@ def test_info_with_sdk_submodule_logging(self):
def check_log_info_with_sdk_submodule_logging(self,
host_out: typing.List[str]):
# See TestLogFilteringFunctions docstring
# System log should not be captured in console
self.assertNotIn('sdk_submodule_logger info', host_out)
self.assertNotIn('sdk_submodule_logger warning', host_out)
self.assertNotIn('sdk_submodule_logger error', host_out)
# System log should be captured in console
self.assertIn('sdk_submodule_logger info', host_out)
self.assertIn('sdk_submodule_logger warning', host_out)
self.assertIn('sdk_submodule_logger error', host_out)
self.assertNotIn('sdk_submodule_logger debug', host_out)
29 changes: 18 additions & 11 deletions tests/unittests/test_third_party_http_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import pathlib
import re
import typing
import urllib.parse
import base64
import sys

from unittest import skipIf
from unittest.mock import patch

from tests.utils import testutils
Expand Down Expand Up @@ -113,9 +115,11 @@ def test_print_to_console_stdout(self):

def check_log_print_to_console_stdout(self,
host_out: typing.List[str]):
# System logs stdout should not exist in host_out
self.assertNotIn('Secret42', host_out)
# System logs stdout now exist in host_out
self.assertIn('Secret42', host_out)

@skipIf(sys.version_info < (3, 9, 0),
"Skip the tests for Python 3.8 and below")
def test_print_to_console_stderr(self):
r = self.webhost.request('GET', 'print_logging?console=true'
'&message=Secret42&is_stderr=true',
Expand All @@ -125,23 +129,26 @@ def test_print_to_console_stderr(self):

def check_log_print_to_console_stderr(self,
host_out: typing.List[str], ):
# System logs stderr should not exist in host_out
self.assertNotIn('Secret42', host_out)
# System logs stderr now exist in host_out
self.assertIn('Secret42', host_out)

def test_raw_body_bytes(self):
parent_dir = pathlib.Path(__file__).parent.parent
image_file = parent_dir / 'unittests/resources/functions.png'
with open(image_file, 'rb') as image:
img = image.read()
sanitized_image = urllib.parse.quote(img)
sanitized_img_len = len(sanitized_image)
encoded_image = base64.b64encode(img).decode('utf-8')
html_img_tag = \
f'<img src="data:image/png;base64,{encoded_image}" alt="PNG Image"/>' # noqa
sanitized_img_len = len(html_img_tag)
r = self.webhost.request('POST', 'raw_body_bytes', data=img,
no_prefix=True)

received_body_len = int(r.headers['body-len'])
self.assertEqual(received_body_len, sanitized_img_len)

body = urllib.parse.unquote_to_bytes(r.content)
encoded_image_data = encoded_image.split(",")[0]
body = base64.b64decode(encoded_image_data)
try:
received_img_file = parent_dir / 'received_img.png'
with open(received_img_file, 'wb') as received_img:
Expand Down Expand Up @@ -217,9 +224,9 @@ def check_log_hijack_current_event_loop(self,
self.assertIn('parallelly_log_custom at custom_logger', host_out)
self.assertIn('callsoon_log', host_out)

# System logs should not exist in host_out
self.assertNotIn('parallelly_log_system at disguised_logger',
host_out)
# System logs now exist in host_out
self.assertIn('parallelly_log_system at disguised_logger',
host_out)


class TestWsgiHttpFunctions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
import sys
from urllib.request import urlopen
import urllib.parse
import base64

import azure.functions as func
from fastapi import FastAPI, Request, Response
Expand Down Expand Up @@ -132,10 +132,13 @@ async def print_logging(message: str = "", flush: str = 'false',

@fast_app.post("/raw_body_bytes")
async def raw_body_bytes(request: Request):
raw_body = await request.body()
sanitized_body = urllib.parse.quote(raw_body)
return Response(content=sanitized_body,
headers={'body-len': str(len(sanitized_body))})
body = await request.body()

base64_encoded = base64.b64encode(body).decode('utf-8')
html_img_tag = \
f'<img src="data:image/png;base64,{base64_encoded}" alt="PNG Image"/>'

return Response(html_img_tag, headers={'body-len': str(len(html_img_tag))})


@fast_app.get("/return_http_no_body")
Expand All @@ -150,17 +153,29 @@ async def return_http(request: Request):

@fast_app.get("/return_http_redirect")
async def return_http_redirect(request: Request, code: str = ''):
allowed_url_pattern = r"^http://.+"
# Expected format: 127.0.0.1:<port>
host_and_port = request.url.components[1]

# Validate to ensure it's a valid host and port structure
match = re.match(r'^127\.0\.0\.1:(\d+)$', host_and_port)
if not match:
return Response("Invalid request", status_code=400)

# Validate port is within specific range
port = int(match.group(1))
if port < 50000 or port > 65999:
return Response("Invalid port", status_code=400)

# Validate the code param
allowed_codes = ['', 'testFunctionKey']
if code not in allowed_codes:
return Response("Invalid code", status_code=400)

# Return after all validation succeeds
location = 'return_http?code={}'.format(code)
redirect_url = f"http://{request.url.components[1]}/{location}"
if re.match(allowed_url_pattern, redirect_url):
# Redirect URL is in the expected format
return RedirectResponse(status_code=302,
url=redirect_url)
# Redirect URL was not in the expected format
return RedirectResponse(status_code=302,
url='/')
url=f"http://{host_and_port}/"
f"{location}")


@fast_app.get("/unhandled_error")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
from urllib.request import urlopen
import urllib.parse
import base64

import azure.functions as func
from flask import Flask, Response, redirect, request, url_for
Expand Down Expand Up @@ -62,8 +62,11 @@ def print_logging():
def raw_body_bytes():
body = request.get_data()

sanitized_body = urllib.parse.quote(body)
return Response(sanitized_body, headers={'body-len': str(len(sanitized_body))})
base64_encoded = base64.b64encode(body).decode('utf-8')
html_img_tag = \
f'<img src="data:image/png;base64,{base64_encoded}" alt="PNG Image"/>'

return Response(html_img_tag, headers={'body-len': str(len(html_img_tag))})


@flask_app.get("/return_http_no_body")
Expand Down
36 changes: 27 additions & 9 deletions tests/utils/testutils_lc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
from zipfile import ZipFile

import requests
from Crypto.Cipher import AES
from Crypto.Hash.SHA256 import SHA256Hash
from Crypto.Util.Padding import pad
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import padding

from tests.utils.constants import PROJECT_ROOT

# Linux Consumption Testing Constants
Expand Down Expand Up @@ -287,19 +289,35 @@ def _encrypt_context(cls, encryption_key: str, plain_text: str) -> str:
"""Encrypt plain text context into a encrypted message which can
be accepted by the host
"""
# Decode the encryption key
encryption_key_bytes = base64.b64decode(encryption_key.encode())
plain_text_bytes = pad(plain_text.encode(), 16)

# Pad the plaintext to be a multiple of the AES block size
padder = padding.PKCS7(algorithms.AES.block_size).padder()
plain_text_bytes = padder.update(plain_text.encode()) + padder.finalize()

# Initialization vector (IV) (fixed value for simplicity)
iv_bytes = '0123456789abcedf'.encode()

# Start encryption
cipher = AES.new(encryption_key_bytes, AES.MODE_CBC, iv=iv_bytes)
encrypted_bytes = cipher.encrypt(plain_text_bytes)
# Create AES cipher with CBC mode
cipher = Cipher(algorithms.AES(encryption_key_bytes),
modes.CBC(iv_bytes), backend=default_backend())

# Prepare final result
# Perform encryption
encryptor = cipher.encryptor()
encrypted_bytes = encryptor.update(plain_text_bytes) + encryptor.finalize()

# Compute SHA256 hash of the encryption key
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(encryption_key_bytes)
key_sha256 = digest.finalize()

# Encode IV, encrypted message, and SHA256 hash in base64
iv_base64 = base64.b64encode(iv_bytes).decode()
encrypted_base64 = base64.b64encode(encrypted_bytes).decode()
key_sha256 = SHA256Hash(encryption_key_bytes).digest()
key_sha256_base64 = base64.b64encode(key_sha256).decode()

# Return the final result
return f'{iv_base64}.{encrypted_base64}.{key_sha256_base64}'

def __enter__(self):
Expand Down
Loading