Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,5 @@ dmypy.json
.idea
src/testing/
src/aiotube.egg-info/
main.py
main.py
*.json
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ A library to access YouTube Public Data without YouTubeAPI

- [Discord](https://discord.gg/Amx7z4EjuA)
- [GitHub](https://github.com/jnsougata/aiotube)

# Table of Contents

- [Installation](#installing)
Expand Down
1 change: 0 additions & 1 deletion aiotube/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
SOFTWARE.
"""


from .errors import *
from .video import Video
from .query import Search
Expand Down
281 changes: 185 additions & 96 deletions aiotube/channel.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
import json
import re
from typing import List, Optional, Dict, Any

from .https import (
channel_about,
streams_data,
uploads_data,
channel_playlists,
upcoming_videos
upcoming_videos,
)
from .video import Video
from .pool import collect
from .utils import dup_filter
from urllib.parse import unquote
from typing import List, Optional, Dict
from .patterns import _ChannelPatterns as Patterns
from .utils import dup_filter, extract_initial_data
from .video import Video


class Channel:

_HEAD = 'https://www.youtube.com/channel/'
_CUSTOM = 'https://www.youtube.com/c/'
_USER = 'https://www.youtube.com/'
_HEAD = "https://www.youtube.com/channel/"
_CUSTOM = "https://www.youtube.com/c/"
_USER = "https://www.youtube.com/"

def __init__(self, channel_id: str):
"""
Expand All @@ -38,76 +35,17 @@ def __init__(self, channel_id: str):
self._target_url = self._CUSTOM + channel_id
elif results[0][0]:
self._usable_id = results[0][0]
self._target_url = self._HEAD + 'UC' + results[0][0]
self._target_url = self._HEAD + "UC" + results[0][0]
elif results[0][1]:
self._usable_id = results[0][1]
self._target_url = self._CUSTOM + results[0][1]
elif results[0][2]:
self._usable_id = results[0][2]
self._target_url = self._USER + '@' + results[0][2]
self.id = None
self.name = None
self.subscribers = None
self.views = None
self.country = None
self.custom_url = None
self.avatar = None
self.banner = None
self.url = None
self.description = None
self.socials = None
self.__meta = None
self._about_page = channel_about(self._target_url)
self.__populate()

def __populate(self):
self.__meta = self.__prepare_metadata()
for k, v in self.__meta.items():
setattr(self, k, v)

def __repr__(self):
return f'<Channel `{self._target_url}`>'

def __prepare_metadata(self) -> Optional[Dict[str, any]]:
"""
Returns channel metadata in a dict format

Returns
-------
Dict
Channel metadata containing the following keys:
id, name, subscribers, views, country, custom_url, avatar, banner, url, description, socials
"""
patterns = [
Patterns.name,
Patterns.avatar,
Patterns.banner,
Patterns.verified,
Patterns.socials
]
extracted = collect(lambda x: x.findall(self._about_page) or None, patterns)
name, avatar, banner, verified, socials = [e[0] if e else None for e in extracted]
info = re.compile("\\[{\"aboutChannelRenderer\":(.*?)],").search(self._about_page).group(1) + "]}}}}"
info = json.loads(info)["metadata"]["aboutChannelViewModel"]
return {
"id": info["channelId"],
"name": name,
"url": "https://www.youtube.com/channel/" + info["channelId"],
"description": info["description"],
"country": info["country"],
"custom_url": info["canonicalChannelUrl"],
"subscribers": info["subscriberCountText"].split(' ')[0],
"views": info["viewCountText"].replace(' views', ''),
"created_at": info["joinedDateText"]["content"].replace('Joined ', ''),
"video_count": info["videoCountText"].split(' ')[0],
"avatar": avatar,
"banner": banner,
"verified": bool(verified),
"socials": unquote(socials)
}
self._target_url = self._USER + "@" + results[0][2]
self.__html = channel_about(self._target_url)

@property
def metadata(self) -> Optional[Dict[str, any]]:
def metadata(self) -> Dict[str, any]:
"""
Returns channel metadata in a dict format

Expand All @@ -117,19 +55,139 @@ def metadata(self) -> Optional[Dict[str, any]]:
Channel metadata containing the following keys:
id, name, subscribers, views, country, custom_url, avatar, banner, url, description, socials etc.
"""
return self.__meta

@property
def live(self) -> bool:
"""
Checks if the channel is live

Returns
-------
bool
True if the channel is live
"""
return bool(self.current_streams)
obj = extract_initial_data(self.__html)
meta = obj["metadata"]["channelMetadataRenderer"]
detailed_meta = (
obj
["onResponseReceivedEndpoints"]
[0]
["showEngagementPanelEndpoint"]
["engagementPanel"]
["engagementPanelSectionListRenderer"]
["content"]
["sectionListRenderer"]
["contents"]
[0]
["itemSectionRenderer"]
["contents"]
[0]
["aboutChannelRenderer"]
["metadata"]
["aboutChannelViewModel"]
)
# is_verified = (
# self.__obj
# ["header"]
# ["pageHeaderRenderer"]
# ["content"]
# ["pageHeaderViewModel"]
# ["title"]
# ["dynamicTextViewModel"]
# ["text"].get(
# "attachmentRuns",
# [
# {
# "element": {
# "type": {
# "imageType": {
# "image": {
# "sources": [
# {"clientResource": {"imageName": "_"}}
# ]
# }
# }
# }
# }
# }
# ],
# )
# [0]
# ["element"]
# ["type"]
# ["imageType"]
# ["image"]
# ["sources"]
# [0]
# ["clientResource"]
# ["imageName"]
# ) == "CHECK_CIRCLE_FILLED"
is_verified = "'metadataBadgeRenderer': {'icon': {'iconType': 'CHECK_CIRCLE_THICK'}" in str(obj)
# is_live = (
# self.__obj
# ["contents"]
# ["twoColumnBrowseResultsRenderer"]
# ["tabs"]
# [0]
# ["tabRenderer"]
# ["content"]
# ["sectionListRenderer"]
# ["contents"]
# [0]
# ["channelFeaturedContentRenderer"]
# ["items"]
# [0]
# ["thumbnailOverlays"]
# [0]
# ["thumbnailOverlayTimeStatusRenderer"]
# ["text"]
# ["runs"]
# [0]
# ["text"]
# ) == "LIVE"
is_live = "'text': {'runs': [{'text': 'LIVE'}]" in str(obj)
data = {
"id": meta["externalId"],
"name": meta["title"],
"description": detailed_meta["description"],
"subscribers": detailed_meta["subscriberCountText"].split(" ")[0],
"views": detailed_meta["viewCountText"]
.replace(" views", "")
.replace(",", ""),
"country": detailed_meta.get("country", ""),
"url": "https://www.youtube.com/channel/" + meta["externalId"],
"avatars": obj["header"]["pageHeaderRenderer"]["content"][
"pageHeaderViewModel"
].get(
"image",
{
"decoratedAvatarViewModel": {
"avatar": {"avatarViewModel": {"image": {"sources": []}}}
}
},
)["decoratedAvatarViewModel"]
["avatar"]
["avatarViewModel"]
["image"]
["sources"],
"banners": obj["header"]["pageHeaderRenderer"]["content"][
"pageHeaderViewModel"
].get(
"banner",
{
"imageBannerViewModel": {
"image": {"sources": []}
}
}
)["imageBannerViewModel"]["image"]["sources"],
"rss_url": meta.pop("rssUrl"),
"video_count": int(
detailed_meta.pop("videoCountText").replace(",", "").split(" ")[0]
),
"custom_url": detailed_meta["canonicalChannelUrl"],
"joined_date": detailed_meta["joinedDateText"]["content"].replace(
"Joined ", ""
),
"socials": [
link["channelExternalLinkViewModel"]["link"]["content"]
for link in detailed_meta.get("links", [])
],
"keywords": obj["microformat"]["microformatDataRenderer"]["tags"],
"is_family_safe": meta["isFamilySafe"],
"available_country_codes": meta["availableCountryCodes"],
"verified": is_verified,
"live": is_live,
}
return data

@property
def streaming_now(self) -> Optional[str]:
Expand Down Expand Up @@ -158,7 +216,11 @@ def current_streams(self) -> Optional[List[str]]:
filtered_ids = dup_filter(Patterns.stream_ids.findall(raw))
if not filtered_ids:
return None
return [id_ for id_ in filtered_ids if f"vi/{id_}/hqdefault_live.jpg" in raw]
return [
id_
for id_ in filtered_ids
if f"vi/{id_}/hqdefault_live.jpg" in streams_data(raw)
]

@property
def old_streams(self) -> Optional[List[str]]:
Expand All @@ -174,7 +236,9 @@ def old_streams(self) -> Optional[List[str]]:
filtered_ids = dup_filter(Patterns.stream_ids.findall(raw))
if not filtered_ids:
return None
return [id_ for id_ in filtered_ids if f"vi/{id_}/hqdefault_live.jpg" not in raw]
return [
id_ for id_ in filtered_ids if f"vi/{id_}/hqdefault_live.jpg" not in raw
]

@property
def last_streamed(self) -> Optional[str]:
Expand All @@ -188,7 +252,7 @@ def last_streamed(self) -> Optional[str]:
"""
ids = self.old_streams
return ids[0] if ids else None

def uploads(self, limit: int = 20) -> Optional[List[str]]:
"""
Fetches the ids of all uploaded videos
Expand All @@ -203,7 +267,9 @@ def uploads(self, limit: int = 20) -> Optional[List[str]]:
List[str] | None
The ids of uploaded videos or None
"""
return dup_filter(Patterns.upload_ids.findall(uploads_data(self._target_url)), limit)
return dup_filter(
Patterns.upload_ids.findall(uploads_data(self._target_url)), limit
)

@property
def last_uploaded(self) -> Optional[str]:
Expand Down Expand Up @@ -250,14 +316,37 @@ def upcomings(self) -> Optional[List[str]]:
video_ids = Patterns.upcoming.findall(raw)
return video_ids

@property
def playlists(self) -> Optional[List[str]]:
@staticmethod
def __format_playlist_data(raw: Dict[str, Any]):
return {
"id": raw["playlistId"],
"title": raw["title"]["runs"][0]["text"],
"video_count": raw["videoCountText"]["runs"][0]["text"],
"thumbnail": raw["thumbnail"]["thumbnails"][0]["url"],
"url": "https://www.youtube.com/playlist?list=" + raw["playlistId"],
}

def playlists(self) -> Optional[List[Dict[str, Any]]]:
"""
Fetches the ids of all playlists
Fetches the basic metadata of some public playlists.

Returns
-------
List[str] | None
The ids of all playlists or None
List[Dict[str, Any]] | None
The basic metadata of all playlists or None
"""
return dup_filter(Patterns.playlists.findall(channel_playlists(self._target_url)))
obj = extract_initial_data(
channel_playlists(self._target_url))["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
index = next(
(index for (index, o) in enumerate(obj)
if o.get("tabRenderer", {"title": ""})["title"] == "Playlists"), None
)
if not index:
return None
raw_playlists = (
obj[index]
["tabRenderer"]["content"]["sectionListRenderer"]["contents"][0]
["itemSectionRenderer"]["contents"][0]["gridRenderer"]["items"]
)
filtered = [item for item in raw_playlists if item.get("gridPlaylistRenderer")]
return [self.__format_playlist_data(item["gridPlaylistRenderer"]) for item in filtered]
2 changes: 1 addition & 1 deletion aiotube/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
trending_feeds,
trending_streams,
_get_trending_learning_videos,
trending_sports
trending_sports,
)
from .utils import dup_filter
from .patterns import _ExtraPatterns as Patterns
Expand Down
Loading