diff --git a/.gitignore b/.gitignore index 1d3a133f..b443d97d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ server/coverage # Mac specific files ignored. extensions/.DS_Store extensions/css/.DS_Store -.DS_Store \ No newline at end of file +.DS_Store +.mypy_cache \ No newline at end of file diff --git a/condense/summarizer.py b/condense/summarizer.py index bd6a61c5..c746e790 100644 --- a/condense/summarizer.py +++ b/condense/summarizer.py @@ -1,9 +1,8 @@ -import os import re import sys import logging import argparse -from typing import List +from typing import Dict, List, Tuple import nltk from transformers import pipeline @@ -34,44 +33,68 @@ def make_parser() -> argparse.ArgumentParser: return parser -def clean_data(data: List[dict]) -> str: +def clean_data(data: List[Dict]) -> List[Dict]: + final_data = [] if data: sentences = [] + start, end, cnt = 0.00, 0, 0 # Removing extra whitespaces for sentence in data: + end = sentence["end"] + cnt += 1 sentence_text = re.sub( "[^a-zA-Z0-9.,;:()'\"\\s]", "", sentence["text"] ) # Add special symbols to preserve inside the square brackets with numbers and characters sentence_sym = re.sub("\\s+", " ", sentence_text) sentences.append(sentence_sym.strip()) - display = " ".join(sentences) - return display + # The model maximum tokens is 500, and each sentence is approximately 10 words, so to be on the safe side, we have considered cnt = 12, so maximum 10 sentences will be used + if cnt == 12: + text = " ".join(sentences) + final_data.append({"start": start, "end": end, "text": text}) + start, cnt = end, 0 + sentences = [] + + return final_data else: raise ValueError("No data found") -def summerize_text(video_url: str) -> str: - nltk.download("punkt") - transcript_data = get_transcript(video_url) - data_clean = clean_data(transcript_data) - sentences = nltk.tokenize.sent_tokenize(data_clean) - data = " ".join(sentences) - +def get_summary(data: List[Dict[str, str]]) -> Tuple[List[Dict], List[Dict]]: summarizer = pipeline("summarization") - max_chunk_length = 400 # Define the maximum length for each chunk - chunks = [data[i : i + max_chunk_length] for i in range(0, len(data), max_chunk_length)] summary = [] - for chunk in chunks: - summary_text = summarizer(chunk, max_length=50, min_length=10, do_sample=False)[0]["summary_text"] - summary.append(summary_text) - return " ".join(summary) + for chunk in data: + summary_text = summarizer(chunk["text"], max_length=50, min_length=1, do_sample=False)[0]["summary_text"] + summary.append({"start": chunk["start"], "end": chunk["end"], "summary_text": summary_text}) + + time_stamp = [] + for chunk in summary: + summary_text = summarizer(chunk["summary_text"], max_length=13, min_length=1, do_sample=False)[0][ + "summary_text" + ] + time_stamp.append({"start": chunk["start"], "end": chunk["end"], "summary_text": summary_text}) + + return (summary, time_stamp) + + +def summerize_text(video_url: str) -> Tuple[List[Dict], List[Dict]]: + nltk.download("punkt") + data = get_transcript(video_url) + data = clean_data(data) + for sentence in data: + sentences = nltk.tokenize.sent_tokenize(sentence["text"]) + sentence["text"] = " ".join(sentences) + + summary, time_stamp = get_summary(data) + return (summary, time_stamp) def main(argv=None) -> int: parser = make_parser() - args = parser.parse_args(argv) - summary = summerize_text(args.video_url) - print(summary) + argv = parser.parse_args(argv) + summary, time_stamp = summerize_text(argv.video_url) + summary_text = " ".join([f"{chunk['summary_text']}" for chunk in summary]) + summary_dict = {"summary": summary_text, "time_stamp": time_stamp} + print(summary_dict) return 0