-
Notifications
You must be signed in to change notification settings - Fork 52
Add telegram client #111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add telegram client #111
Changes from all commits
12cc75c
f5f29c0
a7793c3
bfe2ceb
7af4147
8433c4a
091db8e
8e93a36
b8d0cba
198e4d6
3fc9d49
798a72f
3078682
94a8c01
b0bc592
de2995c
e618089
2122222
c1d309a
8795a7b
a5b4a6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ def get_placeholder_user(): | |
email='[email protected]', | ||
password='1a2s3d4f5g6', | ||
full_name='My Name', | ||
telegram_id='' | ||
) | ||
|
||
|
||
|
@@ -63,8 +64,7 @@ async def update_user_fullname( | |
session.commit() | ||
|
||
url = router.url_path_for("profile") | ||
response = RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
return response | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
||
@router.post("/update_user_email") | ||
|
@@ -110,14 +110,27 @@ async def upload_user_photo( | |
# Save to database | ||
user.avatar = await process_image(pic, user) | ||
session.commit() | ||
|
||
finally: | ||
session.close() | ||
|
||
url = router.url_path_for("profile") | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
||
@router.post("/update_telegram_id") | ||
async def update_telegram_id( | ||
request: Request, session=Depends(get_db)): | ||
|
||
user = session.query(User).filter_by(id=1).first() | ||
data = await request.form() | ||
new_telegram_id = data['telegram_id'] | ||
|
||
# Update database | ||
user.telegram_id = new_telegram_id | ||
session.commit() | ||
|
||
url = router.url_path_for("profile") | ||
return RedirectResponse(url=url, status_code=HTTP_302_FOUND) | ||
|
||
|
||
async def process_image(image, user): | ||
img = Image.open(io.BytesIO(image)) | ||
width, height = img.size | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from fastapi import APIRouter, Body, Depends, Request | ||
|
||
from app.database.database import get_db | ||
from app.database.models import User | ||
from app.telegram.handlers import MessageHandler, reply_unknown_user | ||
from app.telegram.models import Chat | ||
|
||
|
||
router = APIRouter( | ||
prefix="/telegram", | ||
tags=["telegram"], | ||
responses={404: {"description": "Not found"}}, | ||
) | ||
|
||
|
||
@router.get("/") | ||
async def telegram(request: Request, session=Depends(get_db)): | ||
|
||
# todo: Add templating | ||
return "Start using PyLander telegram bot!" | ||
|
||
|
||
@router.post("/") | ||
async def bot_client(req: dict = Body(...), session=Depends(get_db)): | ||
chat = Chat(req) | ||
|
||
# Check if current chatter is registered to use the bot | ||
user = session.query(User).filter_by(telegram_id=chat.user_id).first() | ||
if user is None: | ||
return await reply_unknown_user(chat) | ||
|
||
message = MessageHandler(chat, user) | ||
return await message.process_callback() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from app import config | ||
from app.dependencies import get_settings | ||
from .models import Bot | ||
|
||
|
||
settings: config.Settings = get_settings() | ||
|
||
BOT_API = settings.bot_api | ||
WEBHOOK_URL = settings.webhook_url | ||
|
||
telegram_bot = Bot(BOT_API, WEBHOOK_URL) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import datetime | ||
|
||
from .keyboards import ( | ||
DATE_FORMAT, gen_inline_keyboard, get_this_week_buttons, show_events_kb) | ||
from .models import Chat | ||
from .bot import telegram_bot | ||
from app.database.models import User | ||
|
||
|
||
class MessageHandler: | ||
def __init__(self, chat: Chat, user: User): | ||
self.chat = chat | ||
self.user = user | ||
self.handlers = {} | ||
self.handlers['/start'] = self.start_handler | ||
self.handlers['/show_events'] = self.show_events_handler | ||
self.handlers['Today'] = self.today_handler | ||
self.handlers['This week'] = self.this_week_handler | ||
|
||
# Add next 6 days to handlers dict | ||
for row in get_this_week_buttons(): | ||
for button in row: | ||
self.handlers[button['text']] = self.chosen_day_handler | ||
|
||
async def process_callback(self): | ||
if self.chat.message in self.handlers: | ||
return await self.handlers[self.chat.message]() | ||
return await self.default_handler() | ||
|
||
async def default_handler(self): | ||
answer = "Unknown command." | ||
await telegram_bot.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
async def start_handler(self): | ||
answer = f'''Hello, {self.chat.first_name}! | ||
Welcome to Pylander telegram client!''' | ||
await telegram_bot.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
async def show_events_handler(self): | ||
answer = 'Choose events day.' | ||
await telegram_bot.send_message( | ||
chat_id=self.chat.user_id, | ||
text=answer, | ||
reply_markup=show_events_kb) | ||
return answer | ||
|
||
async def today_handler(self): | ||
today = datetime.datetime.today() | ||
events = [ | ||
_.events for _ in self.user.events | ||
if _.events.start <= today <= _.events.end] | ||
|
||
answer = f"{today.strftime('%B %d')}, {today.strftime('%A')} Events:\n" | ||
|
||
if not events: | ||
answer = "There're no events today." | ||
|
||
for event in events: | ||
answer += f''' | ||
From {event.start.strftime('%d/%m %H:%M')} \ | ||
to {event.end.strftime('%d/%m %H:%M')}: {event.title}.\n''' | ||
|
||
await telegram_bot.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
async def this_week_handler(self): | ||
answer = 'Choose a day.' | ||
this_week_kb = gen_inline_keyboard(get_this_week_buttons()) | ||
|
||
await telegram_bot.send_message( | ||
chat_id=self.chat.user_id, | ||
text=answer, | ||
reply_markup=this_week_kb) | ||
return answer | ||
|
||
async def chosen_day_handler(self): | ||
# Convert chosen day (string) to datetime format | ||
chosen_date = datetime.datetime.strptime( | ||
self.chat.message, DATE_FORMAT) | ||
|
||
events = [ | ||
_.events for _ in self.user.events | ||
if _.events.start <= chosen_date <= _.events.end] | ||
|
||
answer = f"{chosen_date.strftime('%B %d')}, \ | ||
{chosen_date.strftime('%A')} Events:\n" | ||
|
||
if not events: | ||
answer = f"There're no events on {chosen_date.strftime('%B %d')}." | ||
|
||
for event in events: | ||
answer += f''' | ||
From {event.start.strftime('%d/%m %H:%M')} \ | ||
to {event.end.strftime('%d/%m %H:%M')}: {event.title}.\n''' | ||
|
||
await telegram_bot.send_message(chat_id=self.chat.user_id, text=answer) | ||
return answer | ||
|
||
|
||
async def reply_unknown_user(chat): | ||
answer = f''' | ||
Hello, {chat.first_name}! | ||
|
||
To use PyLander Bot you have to register | ||
your Telegram Id in your profile page. | ||
|
||
Your Id is {chat.user_id} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why a user needs to know his chat id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned above, to use a bot you have to register an id in the profile page. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I probably missed your prev comment. |
||
Keep it secret! | ||
|
||
https://calendar.pythonic.guru/profile/ | ||
''' | ||
await telegram_bot.send_message(chat_id=chat.user_id, text=answer) | ||
return answer |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import datetime | ||
import json | ||
from typing import Any, Dict, List | ||
|
||
|
||
show_events_buttons = [ | ||
[ | ||
{'text': 'Today', 'callback_data': 'Today'}, | ||
{'text': 'This week', 'callback_data': 'This week'} | ||
] | ||
] | ||
|
||
DATE_FORMAT = '%d %b %Y' | ||
|
||
|
||
def get_this_week_buttons() -> List[List[Any]]: | ||
today = datetime.datetime.today() | ||
buttons = [] | ||
for day in range(1, 7): | ||
day = today + datetime.timedelta(days=day) | ||
buttons.append(day.strftime(DATE_FORMAT)) | ||
|
||
return [ | ||
[ | ||
{'text': buttons[0], | ||
'callback_data': buttons[0]}, | ||
{'text': buttons[1], | ||
'callback_data': buttons[1]}, | ||
{'text': buttons[2], | ||
'callback_data': buttons[2]} | ||
], | ||
[ | ||
{'text': buttons[3], | ||
'callback_data': buttons[3]}, | ||
{'text': buttons[4], | ||
'callback_data': buttons[4]}, | ||
{'text': buttons[5], | ||
'callback_data': buttons[5]} | ||
] | ||
] | ||
|
||
|
||
def gen_inline_keyboard(buttons: List[List[Any]]) -> Dict[str, Any]: | ||
return {'reply_markup': json.dumps({'inline_keyboard': buttons})} | ||
|
||
|
||
show_events_kb = gen_inline_keyboard(show_events_buttons) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from typing import Any, Dict, Optional | ||
|
||
from httpx import AsyncClient | ||
import requests | ||
|
||
|
||
class Chat: | ||
def __init__(self, data: Dict): | ||
self.message = self._get_message_content(data) | ||
self.user_id = self._get_user_id(data) | ||
self.first_name = self._get_first_name(data) | ||
|
||
def _get_message_content(self, data: Dict) -> str: | ||
if 'callback_query' in data: | ||
return data['callback_query']['data'] | ||
return data['message']['text'] | ||
|
||
def _get_user_id(self, data: Dict) -> str: | ||
if 'callback_query' in data: | ||
return data['callback_query']['from']['id'] | ||
return data['message']['from']['id'] | ||
|
||
def _get_first_name(self, data: Dict) -> str: | ||
if 'callback_query' in data: | ||
return data['callback_query']['from']['first_name'] | ||
return data['message']['from']['first_name'] | ||
|
||
|
||
class Bot: | ||
def __init__(self, bot_api: str, webhook_url: str): | ||
self.base = self._set_base_url(bot_api) | ||
self.webhook_setter_url = self._set_webhook_setter_url(webhook_url) | ||
|
||
def _set_base_url(self, bot_api: str) -> str: | ||
return f'https://api.telegram.org/bot{bot_api}/' | ||
|
||
def _set_webhook_setter_url(self, webhook_url: str) -> str: | ||
return f'{self.base}setWebhook?url={webhook_url}/telegram/' | ||
|
||
def set_webhook(self): | ||
return requests.get(self.webhook_setter_url) | ||
|
||
def drop_webhook(self): | ||
data = {'drop_pending_updates': True} | ||
return requests.get(url=f'{self.base}deleteWebhook', data=data) | ||
|
||
async def send_message( | ||
self, chat_id: str, | ||
text: str, | ||
reply_markup: Optional[Dict[str, Any]] = None): | ||
async with AsyncClient(base_url=self.base) as ac: | ||
message = { | ||
'chat_id': chat_id, | ||
'text': text} | ||
if reply_markup: | ||
message.update(reply_markup) | ||
return await ac.post('sendMessage', data=message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by unknown user?