Compare commits

...

10 Commits

@ -22,28 +22,37 @@ from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import get_channel_overwrites, ignore_filelist
from home.src.ta.settings import EnvironmentSettings
from home.src.ta.ta_redis import RedisQueue
class VideoDownloader:
"""
handle the video download functionality
if not initiated with list, take from queue
"""
class DownloaderBase:
"""base class for shared config"""
CACHE_DIR = EnvironmentSettings.CACHE_DIR
MEDIA_DIR = EnvironmentSettings.MEDIA_DIR
CHANNEL_QUEUE = "download:channel"
PLAYLIST_QUEUE = "download:playlist:full"
PLAYLIST_QUICK = "download:playlist:quick"
VIDEO_QUEUE = "download:video"
def __init__(self, task=False):
self.obs = False
self.channel_overwrites = get_channel_overwrites()
def __init__(self, task):
self.task = task
self.config = AppConfig().config
self.channel_overwrites = get_channel_overwrites()
self.now = int(datetime.now().timestamp())
class VideoDownloader(DownloaderBase):
"""handle the video download functionality"""
def __init__(self, task=False):
super().__init__(task)
self.obs = False
self._build_obs()
self.channels = set()
self.videos = set()
def run_queue(self, auto_only=False):
def run_queue(self, auto_only=False) -> int:
"""setup download queue in redis loop until no more items"""
downloaded = 0
while True:
video_data = self._get_next(auto_only)
if self.task.is_stopped() or not video_data:
@ -62,18 +71,18 @@ class VideoDownloader:
self._notify(video_data, "Add video metadata to index", progress=1)
video_type = VideoTypeEnum(video_data["vid_type"])
vid_dict = index_new_video(youtube_id, video_type=video_type)
self.channels.add(vid_dict["channel"]["channel_id"])
self.videos.add(vid_dict["youtube_id"])
RedisQueue(self.CHANNEL_QUEUE).add(channel_id)
RedisQueue(self.VIDEO_QUEUE).add(youtube_id)
self._notify(video_data, "Move downloaded file to archive")
self.move_to_archive(vid_dict)
self._delete_from_pending(youtube_id)
downloaded += 1
# post processing
self._add_subscribed_channels()
DownloadPostProcess(self).run()
DownloadPostProcess(self.task).run()
return self.videos
return downloaded
def _notify(self, video_data, message, progress=False):
"""send progress notification to task"""
@ -263,18 +272,6 @@ class VideoDownloader:
path = f"ta_download/_doc/{youtube_id}?refresh=true"
_, _ = ElasticWrap(path).delete()
def _add_subscribed_channels(self):
"""add all channels subscribed to refresh"""
all_subscribed = PlaylistSubscription().get_playlists()
if not all_subscribed:
return
channel_ids = [i["playlist_channel_id"] for i in all_subscribed]
for channel_id in channel_ids:
self.channels.add(channel_id)
return
def _reset_auto(self):
"""reset autostart to defaults after queue stop"""
path = "ta_download/_update_by_query"
@ -291,26 +288,20 @@ class VideoDownloader:
print(f"[download] reset auto start on {updated} videos.")
class DownloadPostProcess:
class DownloadPostProcess(DownloaderBase):
"""handle task to run after download queue finishes"""
def __init__(self, download: VideoDownloader) -> None:
self.download: VideoDownloader = download
self.now = int(datetime.now().timestamp())
self.channel_overwrites: dict | None = None
def run(self):
"""run all functions"""
self.channel_overwrites = get_channel_overwrites()
self.auto_delete_all()
self.auto_delete_overwrites()
to_refresh = self.refresh_playlist()
self.match_videos(to_refresh)
self.refresh_playlist()
self.match_videos()
self.get_comments()
def auto_delete_all(self):
"""handle auto delete"""
autodelete_days = self.download.config["downloads"]["autodelete_days"]
autodelete_days = self.config["downloads"]["autodelete_days"]
if not autodelete_days:
return
@ -357,81 +348,107 @@ class DownloadPostProcess:
pending.parse_url_list()
_ = pending.add_to_pending(status="ignore")
def refresh_playlist(self) -> list[str]:
def refresh_playlist(self) -> None:
"""match videos with playlists"""
to_refresh = self._get_to_refresh_playlists()
self.add_playlists_to_refresh()
queue = RedisQueue(self.PLAYLIST_QUEUE)
while True:
total = queue.max_score()
playlist_id, idx = queue.get_next()
if not playlist_id or not idx or not total:
break
total_playlist = len(to_refresh)
for idx, playlist_id in enumerate(to_refresh):
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist(skip_on_empty=True)
if not self.download.task:
if not self.task:
continue
channel_name = playlist.json_data["playlist_channel"]
playlist_title = playlist.json_data["playlist_name"]
message = [
f"Post Processing Playlists for: {channel_name}",
f"{playlist_title} [{idx + 1}/{total_playlist}]",
f"{playlist_title} [{idx}/{total}]",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
return to_refresh
progress = idx / total
self.task.send_progress(message, progress=progress)
def _get_to_refresh_playlists(self) -> list[str]:
"""get playlists to refresh"""
if self.download.task:
def add_playlists_to_refresh(self) -> None:
"""add playlists to refresh"""
if self.task:
message = ["Post Processing Playlists", "Scanning for Playlists"]
self.download.task.send_progress(message)
self.task.send_progress(message)
self._add_playlist_sub()
self._add_channel_playlists()
self._add_video_playlists()
def _add_playlist_sub(self):
"""add subscribed playlists to refresh"""
subs = PlaylistSubscription().get_playlists()
to_add = [i["playlist_id"] for i in subs]
RedisQueue(self.PLAYLIST_QUEUE).add_list(to_add)
def _add_channel_playlists(self):
"""add playlists from channels to refresh"""
queue = RedisQueue(self.CHANNEL_QUEUE)
while True:
channel_id, _ = queue.get_next()
if not channel_id:
break
to_refresh = []
for channel_id in self.download.channels:
channel = YoutubeChannel(channel_id)
channel.get_from_es()
overwrites = channel.get_overwrites()
if "index_playlists" in overwrites:
channel.get_all_playlists()
to_refresh.extend([i[0] for i in channel.all_playlists])
subs = PlaylistSubscription().get_playlists()
for playlist in subs:
playlist_id = playlist["playlist_id"]
if playlist_id not in to_refresh:
to_refresh.append(playlist_id)
return to_refresh
def match_videos(self, to_refresh: list[str]) -> None:
"""scan rest of indexed playlists to match videos"""
must_not = [{"terms": {"playlist_id": to_refresh}}]
video_ids = list(self.download.videos)
to_add = [i[0] for i in channel.all_playlists]
RedisQueue(self.PLAYLIST_QUEUE).add_list(to_add)
def _add_video_playlists(self):
"""add other playlists for quick sync"""
all_playlists = RedisQueue(self.PLAYLIST_QUEUE).get_all()
must_not = [{"terms": {"playlist_id": all_playlists}}]
video_ids = RedisQueue(self.VIDEO_QUEUE).get_all()
must = [{"terms": {"playlist_entries.youtube_id": video_ids}}]
data = {
"query": {"bool": {"must_not": must_not, "must": must}},
"_source": ["playlist_id"],
}
playlists = IndexPaginate("ta_playlist", data).get_results()
to_add = [i["playlist_id"] for i in playlists]
RedisQueue(self.PLAYLIST_QUICK).add_list(to_add)
def match_videos(self) -> None:
"""scan rest of indexed playlists to match videos"""
queue = RedisQueue(self.PLAYLIST_QUICK)
while True:
total = queue.max_score()
playlist_id, idx = queue.get_next()
if not playlist_id or not idx or not total:
break
total_playlist = len(playlists)
for idx, to_match in enumerate(playlists):
playlist_id = to_match["playlist_id"]
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
playlist.add_vids_to_playlist()
playlist.remove_vids_from_playlist()
if not self.download.task:
if not self.task:
continue
message = [
"Post Processing Playlists.",
f"Validate Playlists: - {idx + 1}/{total_playlist}",
f"Validate Playlists: - {idx}/{total}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
progress = idx / total
self.task.send_progress(message, progress=progress)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos, task=self.download.task).index()
video_queue = RedisQueue(self.VIDEO_QUEUE)
comment_list = CommentList(task=self.task)
comment_list.add(video_ids=video_queue.get_all())
video_queue.clear()
comment_list.index()

@ -10,6 +10,7 @@ from datetime import datetime
from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap
from home.src.ta.config import AppConfig
from home.src.ta.ta_redis import RedisQueue
class Comments:
@ -189,20 +190,30 @@ class Comments:
class CommentList:
"""interact with comments in group"""
def __init__(self, video_ids, task=False):
self.video_ids = video_ids
COMMENT_QUEUE = "index:comment"
def __init__(self, task=False):
self.task = task
self.config = AppConfig().config
def index(self):
"""index comments for list, init with task object to notify"""
def add(self, video_ids: list[str]) -> None:
"""add list of videos to get comments, if enabled in config"""
if not self.config["downloads"].get("comment_max"):
return
total_videos = len(self.video_ids)
for idx, youtube_id in enumerate(self.video_ids):
RedisQueue(self.COMMENT_QUEUE).add_list(video_ids)
def index(self):
"""run comment index"""
queue = RedisQueue(self.COMMENT_QUEUE)
while True:
total = queue.max_score()
youtube_id, idx = queue.get_next()
if not youtube_id or not idx or not total:
break
if self.task:
self.notify(idx, total_videos)
self.notify(idx, total)
comment = Comments(youtube_id, config=self.config)
comment.build_json()
@ -211,6 +222,6 @@ class CommentList:
def notify(self, idx, total_videos):
"""send notification on task"""
message = [f"Add comments for new videos {idx + 1}/{total_videos}"]
progress = (idx + 1) / total_videos
message = [f"Add comments for new videos {idx}/{total_videos}"]
progress = idx / total_videos
self.task.send_progress(message, progress=progress)

@ -89,7 +89,9 @@ class Scanner:
)
index_new_video(youtube_id)
CommentList(self.to_index, task=self.task).index()
comment_list = CommentList(task=self.task)
comment_list.add(video_ids=list(self.to_index))
comment_list.index()
def url_fix(self) -> None:
"""

@ -147,7 +147,9 @@ class ImportFolderScanner:
ManualImport(current_video, self.CONFIG).run()
video_ids = [i["video_id"] for i in self.to_import]
CommentList(video_ids, task=self.task).index()
comment_list = CommentList(task=self.task)
comment_list.add(video_ids=video_ids)
comment_list.index()
def _notify(self, idx, current_video):
"""send notification back to task"""

@ -8,6 +8,7 @@ import json
import os
from datetime import datetime
from time import sleep
from typing import Callable, TypedDict
from home.models import CustomPeriodicTask
from home.src.download.subscriptions import ChannelSubscription
@ -23,10 +24,19 @@ from home.src.ta.settings import EnvironmentSettings
from home.src.ta.ta_redis import RedisQueue
class ReindexConfigType(TypedDict):
"""represents config type"""
index_name: str
queue_name: str
active_key: str
refresh_key: str
class ReindexBase:
"""base config class for reindex task"""
REINDEX_CONFIG = {
REINDEX_CONFIG: dict[str, ReindexConfigType] = {
"video": {
"index_name": "ta_video",
"queue_name": "reindex:ta_video",
@ -53,27 +63,25 @@ class ReindexBase:
def __init__(self):
self.config = AppConfig().config
self.now = int(datetime.now().timestamp())
self.total = None
def populate(self, all_ids, reindex_config):
def populate(self, all_ids, reindex_config: ReindexConfigType):
"""add all to reindex ids to redis queue"""
if not all_ids:
return
RedisQueue(queue_name=reindex_config["queue_name"]).add_list(all_ids)
self.total = None
class ReindexPopulate(ReindexBase):
"""add outdated and recent documents to reindex queue"""
INTERVAL_DEFAIULT = 90
INTERVAL_DEFAIULT: int = 90
def __init__(self):
super().__init__()
self.interval = self.INTERVAL_DEFAIULT
def get_interval(self):
def get_interval(self) -> None:
"""get reindex days interval from task"""
try:
task = CustomPeriodicTask.objects.get(name="check_reindex")
@ -84,7 +92,7 @@ class ReindexPopulate(ReindexBase):
if task_config.get("days"):
self.interval = task_config.get("days")
def add_recent(self):
def add_recent(self) -> None:
"""add recent videos to refresh"""
gte = datetime.fromtimestamp(self.now - self.DAYS3).date().isoformat()
must_list = [
@ -102,10 +110,10 @@ class ReindexPopulate(ReindexBase):
return
all_ids = [i["_source"]["youtube_id"] for i in hits]
reindex_config = self.REINDEX_CONFIG.get("video")
reindex_config: ReindexConfigType = self.REINDEX_CONFIG["video"]
self.populate(all_ids, reindex_config)
def add_outdated(self):
def add_outdated(self) -> None:
"""add outdated documents"""
for reindex_config in self.REINDEX_CONFIG.values():
total_hits = self._get_total_hits(reindex_config)
@ -114,7 +122,7 @@ class ReindexPopulate(ReindexBase):
self.populate(all_ids, reindex_config)
@staticmethod
def _get_total_hits(reindex_config):
def _get_total_hits(reindex_config: ReindexConfigType) -> int:
"""get total hits from index"""
index_name = reindex_config["index_name"]
active_key = reindex_config["active_key"]
@ -126,7 +134,7 @@ class ReindexPopulate(ReindexBase):
return len(total)
def _get_daily_should(self, total_hits):
def _get_daily_should(self, total_hits: int) -> int:
"""calc how many should reindex daily"""
daily_should = int((total_hits // self.interval + 1) * self.MULTIPLY)
if daily_should >= 10000:
@ -134,7 +142,9 @@ class ReindexPopulate(ReindexBase):
return daily_should
def _get_outdated_ids(self, reindex_config, daily_should):
def _get_outdated_ids(
self, reindex_config: ReindexConfigType, daily_should: int
) -> list[str]:
"""get outdated from index_name"""
index_name = reindex_config["index_name"]
refresh_key = reindex_config["refresh_key"]
@ -171,7 +181,7 @@ class ReindexManual(ReindexBase):
self.extract_videos = extract_videos
self.data = False
def extract_data(self, data):
def extract_data(self, data) -> None:
"""process data"""
self.data = data
for key, values in self.data.items():
@ -182,7 +192,9 @@ class ReindexManual(ReindexBase):
self.process_index(reindex_config, values)
def process_index(self, index_config, values):
def process_index(
self, index_config: ReindexConfigType, values: list[str]
) -> None:
"""process values per index"""
index_name = index_config["index_name"]
if index_name == "ta_video":
@ -192,32 +204,35 @@ class ReindexManual(ReindexBase):
elif index_name == "ta_playlist":
self._add_playlists(values)
def _add_videos(self, values):
def _add_videos(self, values: list[str]) -> None:
"""add list of videos to reindex queue"""
if not values:
return
RedisQueue("reindex:ta_video").add_list(values)
queue_name = self.REINDEX_CONFIG["video"]["queue_name"]
RedisQueue(queue_name).add_list(values)
def _add_channels(self, values):
def _add_channels(self, values: list[str]) -> None:
"""add list of channels to reindex queue"""
RedisQueue("reindex:ta_channel").add_list(values)
queue_name = self.REINDEX_CONFIG["channel"]["queue_name"]
RedisQueue(queue_name).add_list(values)
if self.extract_videos:
for channel_id in values:
all_videos = self._get_channel_videos(channel_id)
self._add_videos(all_videos)
def _add_playlists(self, values):
def _add_playlists(self, values: list[str]) -> None:
"""add list of playlists to reindex queue"""
RedisQueue("reindex:ta_playlist").add_list(values)
queue_name = self.REINDEX_CONFIG["playlist"]["queue_name"]
RedisQueue(queue_name).add_list(values)
if self.extract_videos:
for playlist_id in values:
all_videos = self._get_playlist_videos(playlist_id)
self._add_videos(all_videos)
def _get_channel_videos(self, channel_id):
def _get_channel_videos(self, channel_id: str) -> list[str]:
"""get all videos from channel"""
data = {
"query": {"term": {"channel.channel_id": {"value": channel_id}}},
@ -226,7 +241,7 @@ class ReindexManual(ReindexBase):
all_results = IndexPaginate("ta_video", data).get_results()
return [i["youtube_id"] for i in all_results]
def _get_playlist_videos(self, playlist_id):
def _get_playlist_videos(self, playlist_id: str) -> list[str]:
"""get all videos from playlist"""
data = {
"query": {"term": {"playlist.keyword": {"value": playlist_id}}},
@ -248,7 +263,7 @@ class Reindex(ReindexBase):
"playlists": 0,
}
def reindex_all(self):
def reindex_all(self) -> None:
"""reindex all in queue"""
if not self.cookie_is_valid():
print("[reindex] cookie invalid, exiting...")
@ -258,26 +273,26 @@ class Reindex(ReindexBase):
if not RedisQueue(index_config["queue_name"]).length():
continue
self.total = RedisQueue(index_config["queue_name"]).length()
while True:
has_next = self.reindex_index(name, index_config)
if not has_next:
break
self.reindex_type(name, index_config)
def reindex_index(self, name, index_config):
def reindex_type(self, name: str, index_config: ReindexConfigType) -> None:
"""reindex all of a single index"""
reindex = self.get_reindex_map(index_config["index_name"])
youtube_id = RedisQueue(index_config["queue_name"]).get_next()
if youtube_id:
reindex = self._get_reindex_map(index_config["index_name"])
queue = RedisQueue(index_config["queue_name"])
while True:
total = queue.max_score()
youtube_id, idx = queue.get_next()
if not youtube_id or not idx or not total:
break
if self.task:
self._notify(name, index_config)
self._notify(name, total, idx)
reindex(youtube_id)
sleep_interval = self.config["downloads"].get("sleep_interval", 0)
sleep(sleep_interval)
return bool(youtube_id)
def get_reindex_map(self, index_name):
def _get_reindex_map(self, index_name: str) -> Callable:
"""return def to run for index"""
def_map = {
"ta_video": self._reindex_single_video,
@ -285,20 +300,15 @@ class Reindex(ReindexBase):
"ta_playlist": self._reindex_single_playlist,
}
return def_map.get(index_name)
return def_map[index_name]
def _notify(self, name, index_config):
def _notify(self, name: str, total: int, idx: int) -> None:
"""send notification back to task"""
if self.total is None:
self.total = RedisQueue(index_config["queue_name"]).length()
remaining = RedisQueue(index_config["queue_name"]).length()
idx = self.total - remaining
message = [f"Reindexing {name.title()}s {idx}/{self.total}"]
progress = idx / self.total
message = [f"Reindexing {name.title()}s {idx}/{total}"]
progress = idx / total
self.task.send_progress(message, progress=progress)
def _reindex_single_video(self, youtube_id):
def _reindex_single_video(self, youtube_id: str) -> None:
"""refresh data for single video"""
video = YoutubeVideo(youtube_id)
@ -337,9 +347,7 @@ class Reindex(ReindexBase):
Comments(youtube_id, config=self.config).reindex_comments()
self.processed["videos"] += 1
return
def _reindex_single_channel(self, channel_id):
def _reindex_single_channel(self, channel_id: str) -> None:
"""refresh channel data and sync to videos"""
# read current state
channel = YoutubeChannel(channel_id)
@ -370,7 +378,7 @@ class Reindex(ReindexBase):
ChannelFullScan(channel_id).scan()
self.processed["channels"] += 1
def _reindex_single_playlist(self, playlist_id):
def _reindex_single_playlist(self, playlist_id: str) -> None:
"""refresh playlist data"""
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
@ -386,9 +394,8 @@ class Reindex(ReindexBase):
return
self.processed["playlists"] += 1
return
def cookie_is_valid(self):
def cookie_is_valid(self) -> bool:
"""return true if cookie is enabled and valid"""
if not self.config["downloads"]["cookie_import"]:
# is not activated, continue reindex
@ -397,7 +404,7 @@ class Reindex(ReindexBase):
valid = CookieHandler(self.config).validate()
return valid
def build_message(self):
def build_message(self) -> str:
"""build progress message"""
message = ""
for key, value in self.processed.items():
@ -427,7 +434,7 @@ class ReindexProgress(ReindexBase):
self.request_type = request_type
self.request_id = request_id
def get_progress(self):
def get_progress(self) -> dict:
"""get progress from task"""
queue_name, request_type = self._get_queue_name()
total = self._get_total_in_queue(queue_name)

@ -104,6 +104,17 @@ class RedisQueue(RedisBase):
dynamically interact with queues in redis using sorted set
- low score number is first in queue
- add new items with high score number
queue names in use:
download:channel channels during download
download:playlist:full playlists during dl for full refresh
download:playlist:quick playlists during dl for quick refresh
download:video videos during downloads
index:comment videos needing comment indexing
reindex:ta_video reindex videos
reindex:ta_channel reindex channels
reindex:ta_playlist reindex playlists
"""
def __init__(self, queue_name: str):
@ -129,15 +140,29 @@ class RedisQueue(RedisBase):
def add(self, to_add: str) -> None:
"""add single item to queue"""
if not to_add:
return
next_score = self._get_next_score()
self.conn.zadd(self.key, {to_add: next_score})
def add_list(self, to_add: list) -> None:
"""add list to queue"""
if not to_add:
return
next_score = self._get_next_score()
mapping = {i[1]: next_score + i[0] for i in enumerate(to_add)}
self.conn.zadd(self.key, mapping)
def max_score(self) -> int | None:
"""get max score"""
last = self.conn.zrange(self.key, -1, -1, withscores=True)
if not last:
return None
return int(last[0][1])
def _get_next_score(self) -> float:
"""get next score in queue to append"""
last = self.conn.zrange(self.key, -1, -1, withscores=True)
@ -146,13 +171,15 @@ class RedisQueue(RedisBase):
return last[0][1] + 1
def get_next(self) -> str | bool:
def get_next(self) -> tuple[str | None, int | None]:
"""return next element in the queue, if available"""
result = self.conn.zpopmin(self.key)
if not result:
return False
return None, None
item, idx = result[0][0], int(result[0][1]) + 1
return result[0][0]
return item, idx
def clear(self) -> None:
"""delete list from redis"""

@ -128,7 +128,7 @@ def download_pending(self, auto_only=False):
videos_downloaded = downloader.run_queue(auto_only=auto_only)
if videos_downloaded:
return f"downloaded {len(videos_downloaded)} videos."
return f"downloaded {videos_downloaded} video(s)."
return None

@ -1,14 +1,14 @@
apprise==1.7.6
apprise==1.8.0
celery==5.4.0
Django==5.0.4
Django==5.0.6
django-auth-ldap==4.8.0
django-celery-beat==2.6.0
django-cors-headers==4.3.1
djangorestframework==3.15.1
Pillow==10.3.0
redis==5.0.3
redis==5.0.4
requests==2.31.0
ryd-client==0.0.6
uWSGI==2.0.25.1
whitenoise==6.6.0
yt-dlp @ git+https://github.com/bbilly1/yt-dlp@4935eec0b4f4dffbd86d998a2d3a706875e9d761
yt-dlp @ git+https://github.com/bbilly1/yt-dlp@54b823be28f396608349cca69d52eb4c4b72b8b0

Loading…
Cancel
Save