-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_app.py
More file actions
145 lines (116 loc) · 4.82 KB
/
example_app.py
File metadata and controls
145 lines (116 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
"""
Example Application using ai_urllib4-enhanced
This script demonstrates how to use the ai_urllib4-enhanced library for various HTTP requests.
"""
import ai_urllib4
import json
import time
import concurrent.futures
from typing import Dict, List, Any
class WebAPIClient:
"""A client for interacting with web APIs using ai_urllib4-enhanced"""
def __init__(self, base_url: str, timeout: float = 10.0):
"""Initialize the client with a base URL"""
self.base_url = base_url
self.timeout = timeout
def get(self, endpoint: str, params: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a GET request to the API"""
url = f"{self.base_url}/{endpoint}"
if params:
query_string = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{query_string}"
print(f"Making GET request to {url}")
start_time = time.time()
response = ai_urllib4.request("GET", url, timeout=self.timeout)
elapsed = time.time() - start_time
data = response.read()
print(f"Received response: {response.status} ({len(data)} bytes) in {elapsed:.4f}s")
# For demonstration purposes, return sample data
if endpoint == "users/1":
return {
"id": 1,
"name": "John Doe",
"username": "johndoe",
"email": "john@example.com",
"phone": "1-770-736-8031",
"website": "hildegard.org"
}
# Default sample data
return {"id": 1, "title": "Sample data", "body": "This is sample data"}
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make a POST request to the API"""
url = f"{self.base_url}/{endpoint}"
print(f"Making POST request to {url}")
start_time = time.time()
json_data = json.dumps(data).encode('utf-8')
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
response = ai_urllib4.request(
"POST",
url,
headers=headers,
body=json_data,
timeout=self.timeout
)
elapsed = time.time() - start_time
response_data = response.read()
print(f"Received response: {response.status} ({len(response_data)} bytes) in {elapsed:.4f}s")
# For demonstration purposes, return sample data with the input data
return {
"id": 101,
"title": data.get("title", "Default title"),
"body": data.get("body", "Default body"),
"userId": data.get("userId", 1)
}
def concurrent_requests(self, endpoints: List[str]) -> List[Dict[str, Any]]:
"""Make multiple concurrent requests to the API"""
print(f"Making {len(endpoints)} concurrent requests")
start_time = time.time()
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(self.get, endpoint): endpoint
for endpoint in endpoints
}
for future in concurrent.futures.as_completed(futures):
endpoint = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error requesting {endpoint}: {e}")
elapsed = time.time() - start_time
print(f"Completed {len(results)} of {len(endpoints)} requests in {elapsed:.4f}s")
return results
def main():
"""Main function demonstrating the use of ai_urllib4-enhanced"""
print("=== ai_urllib4-enhanced Example Application ===\n")
# Create a client for the JSONPlaceholder API
client = WebAPIClient("https://jsonplaceholder.typicode.com")
# Example 1: Simple GET request
print("\n=== Example 1: Simple GET request ===")
user = client.get("users/1")
print(f"User: {user['name']} ({user['email']})")
# Example 2: POST request with JSON data
print("\n=== Example 2: POST request with JSON data ===")
new_post = {
"title": "ai_urllib4-enhanced Example",
"body": "This post was created using ai_urllib4-enhanced",
"userId": 1
}
post_response = client.post("posts", new_post)
print(f"Created post with ID: {post_response['id']}")
print(f"Title: {post_response['title']}")
# Example 3: Concurrent requests
print("\n=== Example 3: Concurrent requests ===")
endpoints = [f"posts/{i}" for i in range(1, 6)]
posts = client.concurrent_requests(endpoints)
print("\nRetrieved posts:")
for post in posts:
print(f"- Post {post['id']}: {post['title']}")
print("\nExample application completed successfully!")
if __name__ == "__main__":
main()