-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLivedb_client.py
109 lines (96 loc) · 3.69 KB
/
Livedb_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import random
import string
import sys
import requests
server_address_ = "http://127.0.0.1:5000"
# server_address_ = "https://livedb.pythonanywhere.com/"
class LiveDB:
def __init__(self, user_code, server_address=server_address_):
self.server_address = server_address
self.user_code = user_code
url = f"{self.server_address}/db_status"
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print("Database is not active.")
sys.exit(1)
data = {"user_code": self.user_code}
response = requests.post(f"{self.server_address}/user_status", json=data)
if response.status_code == 200:
json_data = response.json()
if json_data["user_status"]:
print("User not found")
sys.exit(1)
else:
raise Exception(f"Error checking for data: {response.status_code}")
def insert(self, key, value):
data = {"user_code": self.user_code, "key": key, "value": value}
response = requests.post(f"{self.server_address}/insert_value", json=data)
if response.status_code == 200:
json_data = response.json()
if not json_data["result"]:
print(json_data["message"])
sys.exit(1)
else:
return True
else:
raise Exception(f"Error inserting data: {response.status_code}")
def read(self, key):
data = {"user_code": self.user_code, "key": key}
response = requests.post(f"{self.server_address}/get_value", json=data)
if response.status_code == 200:
json_data = response.json()
if not json_data["result"]:
print(json_data["message"])
sys.exit(1)
else:
return True
else:
raise Exception(f"Error reading data: {response.status_code}")
def delkey(self, key):
data = {"user_code": self.user_code, "key": key}
response = requests.post(f"{self.server_address}/delete_key", json=data)
if response.status_code == 200:
json_data = response.json()
if not json_data["result"]:
print(json_data["message"])
sys.exit(1)
else:
return True
else:
raise Exception(f"Error deleting key: {response.status_code}")
def delete_user_keys(self):
data = {"user_code": self.user_code}
response = requests.post(f"{self.server_address}/delete_user_keys", json=data)
if response.status_code == 200:
json_data = response.json()
if not json_data["result"]:
print(json_data["message"])
sys.exit(1)
else:
return True
else:
raise Exception(f"Error deleting key: {response.status_code}")
def list(self):
data = {"user_code": self.user_code}
response = requests.post(f"{self.server_address}/listkeyvalues", json=data)
if response.status_code == 200:
json_data = response.json()
if not json_data["result"]:
print(json_data["message"])
sys.exit(1)
else:
return json_data["key_value_pairs"]
else:
raise Exception(f"Error reading list: {response.status_code}")
if __name__ == "__main__":
user_code = "62c4163caa044745a3cf2f26b42228c8"
livedb = LiveDB(user_code)
values = livedb.list()
for i in values:
print(i)
livedb.insert("vake0c52", "Testing")
print(livedb.read("vake0c52"))
values = livedb.list()
for i in values:
print(i)