-
Notifications
You must be signed in to change notification settings - Fork 52
Add telegram client #111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add telegram client #111
Changes from 15 commits
12cc75c
f5f29c0
a7793c3
bfe2ceb
7af4147
8433c4a
091db8e
8e93a36
b8d0cba
198e4d6
3fc9d49
798a72f
3078682
94a8c01
b0bc592
de2995c
e618089
2122222
c1d309a
8795a7b
a5b4a6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ def get_placeholder_user(): | |
email='[email protected]', | ||
password='1a2s3d4f5g6', | ||
full_name='My Name', | ||
telegram_id='' | ||
) | ||
|
||
|
||
|
@@ -43,8 +44,6 @@ async def profile( | |
session.commit() | ||
user = session.query(User).filter_by(id=1).first() | ||
|
||
session.close() | ||
|
||
return templates.TemplateResponse("profile.html", { | ||
"request": request, | ||
"user": user, | ||
|
@@ -64,11 +63,8 @@ async def update_user_fullname( | |
user.full_name = new_fullname | ||
session.commit() | ||
|
||
session.close() | ||
|
||
url = router.url_path_for("profile") | ||
response = RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
return response | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
||
@router.post("/update_user_email") | ||
|
@@ -83,8 +79,6 @@ async def update_user_email( | |
user.email = new_email | ||
session.commit() | ||
|
||
session.close() | ||
|
||
url = router.url_path_for("profile") | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
@@ -101,8 +95,6 @@ async def update_profile( | |
user.description = new_description | ||
session.commit() | ||
|
||
session.close() | ||
|
||
url = router.url_path_for("profile") | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
@@ -118,14 +110,27 @@ async def upload_user_photo( | |
# Save to database | ||
user.avatar = await process_image(pic, user) | ||
session.commit() | ||
|
||
finally: | ||
session.close() | ||
|
||
url = router.url_path_for("profile") | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
||
@router.post("/update_telegram_id") | ||
async def update_telegram_id( | ||
request: Request, session=Depends(get_db)): | ||
|
||
user = session.query(User).filter_by(id=1).first() | ||
data = await request.form() | ||
new_telegram_id = data['telegram_id'] | ||
|
||
# Update database | ||
user.telegram_id = new_telegram_id | ||
session.commit() | ||
|
||
url = router.url_path_for("profile") | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
||
async def process_image(image, user): | ||
img = Image.open(io.BytesIO(image)) | ||
width, height = img.size | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from fastapi import APIRouter, Body, Depends, Request | ||
|
||
from app.database.database import get_db | ||
from app.database.models import User | ||
from app.telegram.handlers import MessageHandler, reply_unknown_user | ||
from app.telegram.models import Chat | ||
|
||
|
||
router = APIRouter( | ||
prefix="/telegram", | ||
tags=["telegram"], | ||
responses={404: {"description": "Not found"}}, | ||
) | ||
|
||
|
||
@router.get("/") | ||
async def telegram(request: Request, session=Depends(get_db)): | ||
|
||
# todo: Add templating | ||
return "Start using PyLander telegram bot!" | ||
|
||
|
||
@router.post("/") | ||
def bot_client(req: dict = Body(...), session=Depends(get_db)): | ||
chat = Chat(req) | ||
|
||
# Check if current chatter is registered to use the bot | ||
user = session.query(User).filter_by(telegram_id=chat.user_id).first() | ||
if user is None: | ||
return reply_unknown_user(chat) | ||
|
||
message = MessageHandler(chat, user) | ||
return message.process_callback() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import datetime | ||
|
||
from .keyboards import ( | ||
DATE_FORMAT, gen_inline_keyboard, get_this_week_buttons, show_events_kb) | ||
from .models import Chat | ||
from .pylander import pylander | ||
from app.database.models import User | ||
|
||
|
||
class MessageHandler: | ||
def __init__(self, chat: Chat, user: User): | ||
self.chat = chat | ||
self.user = user | ||
self.handlers = {} | ||
self.handlers['/start'] = self.start_handler | ||
self.handlers['/show_events'] = self.show_events_handler | ||
self.handlers['Today'] = self.today_handler | ||
self.handlers['This week'] = self.this_week_handler | ||
|
||
# Add next 6 days to handlers dict | ||
for row in get_this_week_buttons(): | ||
for button in row: | ||
self.handlers[button['text']] = self.chosen_day_handler | ||
|
||
def process_callback(self): | ||
if self.chat.message in self.handlers: | ||
return self.handlers[self.chat.message]() | ||
return self.default_handler() | ||
|
||
def default_handler(self): | ||
answer = "Unknown command." | ||
pylander.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
def start_handler(self): | ||
answer = f'''Hello, {self.chat.first_name}! | ||
Welcome to Pylander telegram client!''' | ||
pylander.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
def show_events_handler(self): | ||
answer = 'Choose events day.' | ||
pylander.send_message( | ||
chat_id=self.chat.user_id, | ||
text=answer, | ||
reply_markup=show_events_kb) | ||
return answer | ||
|
||
def today_handler(self): | ||
today = datetime.date.today() | ||
events = [ | ||
event for event in self.user.events | ||
if event.start <= today <= event.end] | ||
|
||
answer = f"{today.strftime('%B %d')}, {today.strftime('%A')} Events:\n" | ||
|
||
if not len(events): | ||
answer = "There're no events today." | ||
|
||
for event in events: | ||
answer += f'\n\n{event.title}: from {event.start} to {event.ends}.' | ||
|
||
pylander.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
def this_week_handler(self): | ||
answer = 'Choose a day.' | ||
this_week_kb = gen_inline_keyboard(get_this_week_buttons()) | ||
|
||
pylander.send_message( | ||
chat_id=self.chat.user_id, | ||
text=answer, | ||
reply_markup=this_week_kb) | ||
return answer | ||
|
||
def chosen_day_handler(self): | ||
# Convert chosen day (string) to datetime format | ||
chosen_date = datetime.datetime.strptime( | ||
self.chat.message, DATE_FORMAT) | ||
|
||
events = [ | ||
event for event in self.user.events | ||
if event.start <= chosen_date <= event.end] | ||
|
||
answer = f"{chosen_date.strftime('%B %d')}, \ | ||
{chosen_date.strftime('%A')} Events:\n" | ||
|
||
if not len(events): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be more Pythonic: |
||
answer = f"There're no events on {chosen_date.strftime('%B %d')}." | ||
|
||
for event in events: | ||
answer += f'\n\n{event.title}: from {event.start} to {event.ends}.' | ||
|
||
pylander.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
|
||
def reply_unknown_user(chat): | ||
answer = f''' | ||
Hello, {chat.first_name}! | ||
|
||
To use PyLander Bot you have to register | ||
your Telegram Id in your profile page. | ||
|
||
Your Id is {chat.user_id} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why a user needs to know his chat id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned above, to use a bot you have to register an id in the profile page. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I probably missed your prev comment. |
||
Keep it secret! | ||
|
||
https://calendar.pythonic.guru/profile/ | ||
''' | ||
pylander.send_message(chat_id=chat.user_id, text=answer) | ||
return answer |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import datetime | ||
import json | ||
from typing import Any, Dict, List | ||
|
||
|
||
show_events_buttons = [ | ||
[ | ||
{'text': 'Today', 'callback_data': 'Today'}, | ||
{'text': 'This week', 'callback_data': 'This week'} | ||
] | ||
] | ||
|
||
DATE_FORMAT = '%d %b %Y' | ||
|
||
|
||
def get_this_week_buttons() -> List[List[Any]]: | ||
today = datetime.date.today() | ||
day1 = today + datetime.timedelta(days=1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for loop instead? |
||
day2 = today + datetime.timedelta(days=2) | ||
day3 = today + datetime.timedelta(days=3) | ||
day4 = today + datetime.timedelta(days=4) | ||
day5 = today + datetime.timedelta(days=5) | ||
day6 = today + datetime.timedelta(days=6) | ||
|
||
return [ | ||
[ | ||
{'text': day1.strftime(DATE_FORMAT), | ||
'callback_data': day1.strftime(DATE_FORMAT)}, | ||
{'text': day2.strftime(DATE_FORMAT), | ||
'callback_data': day2.strftime(DATE_FORMAT)}, | ||
{'text': day3.strftime(DATE_FORMAT), | ||
'callback_data': day3.strftime(DATE_FORMAT)} | ||
], | ||
[ | ||
{'text': day4.strftime(DATE_FORMAT), | ||
'callback_data': day4.strftime(DATE_FORMAT)}, | ||
{'text': day5.strftime(DATE_FORMAT), | ||
'callback_data': day5.strftime(DATE_FORMAT)}, | ||
{'text': day6.strftime(DATE_FORMAT), | ||
'callback_data': day6.strftime(DATE_FORMAT)} | ||
] | ||
] | ||
|
||
|
||
def gen_inline_keyboard(buttons: List[List[Any]]) -> Dict[str, Any]: | ||
return {'reply_markup': json.dumps({'inline_keyboard': buttons})} | ||
|
||
|
||
show_events_kb = gen_inline_keyboard(show_events_buttons) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from typing import Any, Dict, Optional | ||
|
||
import requests | ||
|
||
|
||
class Chat: | ||
def __init__(self, data): | ||
self.message = self._get_message_content(data) | ||
self.user_id = self._get_user_id(data) | ||
self.first_name = self._get_first_name(data) | ||
|
||
def _get_message_content(self, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type annotation is missing. Also, what is |
||
if 'callback_query' in data: | ||
return data['callback_query']['data'] | ||
return data['message']['text'] | ||
|
||
def _get_user_id(self, data): | ||
if 'callback_query' in data: | ||
return data['callback_query']['from']['id'] | ||
return data['message']['from']['id'] | ||
|
||
def _get_first_name(self, data): | ||
if 'callback_query' in data: | ||
return data['callback_query']['from']['first_name'] | ||
return data['message']['from']['first_name'] | ||
|
||
|
||
class Bot: | ||
def __init__(self, bot_api: str, webhook_url: str): | ||
self.base = self._set_base_url(bot_api) | ||
self.webhook_setter_url = self._set_webhook_setter_url(webhook_url) | ||
|
||
def _set_base_url(self, bot_api): | ||
return f'https://api.telegram.org/bot{bot_api}/' | ||
|
||
def _set_webhook_setter_url(self, webhook_url: str): | ||
return f'{self.base}setWebhook?url={webhook_url}/telegram/' | ||
|
||
def set_webhook(self): | ||
return requests.get(self.webhook_setter_url) | ||
|
||
def drop_webhook(self): | ||
data = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be one line |
||
'drop_pending_updates': True | ||
} | ||
return requests.get( | ||
url=f'{self.base}deleteWebhook', data=data) | ||
|
||
def send_message( | ||
self, chat_id: str, | ||
text: str, | ||
reply_markup: Optional[Dict[str, Any]] = None): | ||
message = { | ||
'chat_id': chat_id, | ||
'text': text, | ||
'reply_markup': None} | ||
if reply_markup: | ||
message.update(reply_markup) | ||
return requests.post(f'{self.base}sendMessage', data=message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be async/background process. Currently it blocks the whole server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
as telegram_bot
, to prevent future name collisions?