Compare commits

...

9 Commits

@ -227,15 +227,18 @@ class PendingList(PendingIndex):
def _parse_playlist(self, url):
"""add all videos of playlist to list"""
playlist = YoutubePlaylist(url)
playlist.build_json()
if not playlist.json_data:
is_active = playlist.update_playlist()
if not is_active:
message = f"{playlist.youtube_id}: failed to extract metadata"
print(message)
raise ValueError(message)
video_results = playlist.json_data.get("playlist_entries")
youtube_ids = [i["youtube_id"] for i in video_results]
for video_id in youtube_ids:
entries = playlist.json_data["playlist_entries"]
to_add = [i["youtube_id"] for i in entries if not i["downloaded"]]
if not to_add:
return
for video_id in to_add:
# match vid_type later
self._add_video(video_id, VideoTypeEnum.UNKNOWN)

@ -12,6 +12,7 @@ from home.src.index.channel import YoutubeChannel
from home.src.index.playlist import YoutubePlaylist
from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig
from home.src.ta.helper import is_missing
from home.src.ta.urlparser import Parser
@ -105,10 +106,6 @@ class ChannelSubscription:
if not all_channels:
return False
pending = queue.PendingList()
pending.get_download()
pending.get_indexed()
missing_videos = []
total = len(all_channels)
@ -118,22 +115,22 @@ class ChannelSubscription:
last_videos = self.get_last_youtube_videos(channel_id)
if last_videos:
ids_to_add = is_missing([i[0] for i in last_videos])
for video_id, _, vid_type in last_videos:
if video_id not in pending.to_skip:
if video_id in ids_to_add:
missing_videos.append((video_id, vid_type))
if not self.task:
continue
if self.task:
if self.task.is_stopped():
self.task.send_progress(["Received Stop signal."])
break
if self.task.is_stopped():
self.task.send_progress(["Received Stop signal."])
break
self.task.send_progress(
message_lines=[f"Scanning Channel {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
self.task.send_progress(
message_lines=[f"Scanning Channel {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
return missing_videos
@ -174,10 +171,6 @@ class PlaylistSubscription:
def process_url_str(self, new_playlists, subscribed=True):
"""process playlist subscribe form url_str"""
data = {"query": {"match_all": {}}, "_source": ["youtube_id"]}
all_indexed = IndexPaginate("ta_video", data).get_results()
all_youtube_ids = [i["youtube_id"] for i in all_indexed]
for idx, playlist in enumerate(new_playlists):
playlist_id = playlist["url"]
if not playlist["type"] == "playlist":
@ -185,7 +178,6 @@ class PlaylistSubscription:
continue
playlist_h = YoutubePlaylist(playlist_id)
playlist_h.all_youtube_ids = all_youtube_ids
playlist_h.build_json()
if not playlist_h.json_data:
message = f"{playlist_h.youtube_id}: failed to extract data"
@ -223,27 +215,15 @@ class PlaylistSubscription:
playlist.json_data["playlist_subscribed"] = subscribe_status
playlist.upload_to_es()
@staticmethod
def get_to_ignore():
"""get all youtube_ids already downloaded or ignored"""
pending = queue.PendingList()
pending.get_download()
pending.get_indexed()
return pending.to_skip
def find_missing(self):
"""find videos in subscribed playlists not downloaded yet"""
all_playlists = [i["playlist_id"] for i in self.get_playlists()]
if not all_playlists:
return False
to_ignore = self.get_to_ignore()
missing_videos = []
total = len(all_playlists)
for idx, playlist_id in enumerate(all_playlists):
size_limit = self.config["subscriptions"]["channel_size"]
playlist = YoutubePlaylist(playlist_id)
is_active = playlist.update_playlist()
if not is_active:
@ -251,27 +231,29 @@ class PlaylistSubscription:
continue
playlist_entries = playlist.json_data["playlist_entries"]
size_limit = self.config["subscriptions"]["channel_size"]
if size_limit:
del playlist_entries[size_limit:]
all_missing = [i for i in playlist_entries if not i["downloaded"]]
for video in all_missing:
youtube_id = video["youtube_id"]
if youtube_id not in to_ignore:
missing_videos.append(youtube_id)
to_check = [
i["youtube_id"]
for i in playlist_entries
if i["downloaded"] is False
]
needs_downloading = is_missing(to_check)
missing_videos.extend(needs_downloading)
if not self.task:
continue
if self.task:
self.task.send_progress(
message_lines=[f"Scanning Playlists {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
if self.task.is_stopped():
self.task.send_progress(["Received Stop signal."])
break
if self.task.is_stopped():
self.task.send_progress(["Received Stop signal."])
break
self.task.send_progress(
message_lines=[f"Scanning Playlists {idx + 1}/{total}"],
progress=(idx + 1) / total,
)
return missing_videos

@ -24,124 +24,6 @@ from home.src.ta.helper import ignore_filelist
from home.src.ta.settings import EnvironmentSettings
class DownloadPostProcess:
"""handle task to run after download queue finishes"""
def __init__(self, download):
self.download = download
self.now = int(datetime.now().timestamp())
self.pending = False
def run(self):
"""run all functions"""
self.pending = PendingList()
self.pending.get_download()
self.pending.get_channels()
self.pending.get_indexed()
self.auto_delete_all()
self.auto_delete_overwrites()
self.validate_playlists()
self.get_comments()
def auto_delete_all(self):
"""handle auto delete"""
autodelete_days = self.download.config["downloads"]["autodelete_days"]
if not autodelete_days:
return
print(f"auto delete older than {autodelete_days} days")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
data = {
"query": {"range": {"player.watched_date": {"lte": now_lte}}},
"sort": [{"player.watched_date": {"order": "asc"}}],
}
self._auto_delete_watched(data)
def auto_delete_overwrites(self):
"""handle per channel auto delete from overwrites"""
for channel_id, value in self.pending.channel_overwrites.items():
if "autodelete_days" in value:
autodelete_days = value.get("autodelete_days")
print(f"{channel_id}: delete older than {autodelete_days}d")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
must_list = [
{"range": {"player.watched_date": {"lte": now_lte}}},
{"term": {"channel.channel_id": {"value": channel_id}}},
]
data = {
"query": {"bool": {"must": must_list}},
"sort": [{"player.watched_date": {"order": "desc"}}],
}
self._auto_delete_watched(data)
@staticmethod
def _auto_delete_watched(data):
"""delete watched videos after x days"""
to_delete = IndexPaginate("ta_video", data).get_results()
if not to_delete:
return
for video in to_delete:
youtube_id = video["youtube_id"]
print(f"{youtube_id}: auto delete video")
YoutubeVideo(youtube_id).delete_media_file()
print("add deleted to ignore list")
vids = [{"type": "video", "url": i["youtube_id"]} for i in to_delete]
pending = PendingList(youtube_ids=vids)
pending.parse_url_list()
_ = pending.add_to_pending(status="ignore")
def validate_playlists(self):
"""look for playlist needing to update"""
for id_c, channel_id in enumerate(self.download.channels):
channel = YoutubeChannel(channel_id, task=self.download.task)
overwrites = self.pending.channel_overwrites.get(channel_id, False)
if overwrites and overwrites.get("index_playlists"):
# validate from remote
channel.index_channel_playlists()
continue
# validate from local
playlists = channel.get_indexed_playlists(active_only=True)
all_channel_playlist = [i["playlist_id"] for i in playlists]
self._validate_channel_playlist(all_channel_playlist, id_c)
def _validate_channel_playlist(self, all_channel_playlist, id_c):
"""scan channel for playlist needing update"""
all_youtube_ids = [i["youtube_id"] for i in self.pending.all_videos]
for id_p, playlist_id in enumerate(all_channel_playlist):
playlist = YoutubePlaylist(playlist_id)
playlist.all_youtube_ids = all_youtube_ids
playlist.build_json(scrape=True)
if not playlist.json_data:
playlist.deactivate()
continue
playlist.add_vids_to_playlist()
playlist.upload_to_es()
self._notify_playlist_progress(all_channel_playlist, id_c, id_p)
def _notify_playlist_progress(self, all_channel_playlist, id_c, id_p):
"""notify to UI"""
if not self.download.task:
return
total_channel = len(self.download.channels)
total_playlist = len(all_channel_playlist)
message = [
f"Post Processing Channels: {id_c}/{total_channel}",
f"Validate Playlists {id_p + 1}/{total_playlist}",
]
progress = (id_c + 1) / total_channel
self.download.task.send_progress(message, progress=progress)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos, task=self.download.task).index()
class VideoDownloader:
"""
handle the video download functionality
@ -423,3 +305,162 @@ class VideoDownloader:
updated = response.get("updated")
if updated:
print(f"[download] reset auto start on {updated} videos.")
class DownloadPostProcess:
"""handle task to run after download queue finishes"""
def __init__(self, download: VideoDownloader) -> None:
self.download: VideoDownloader = download
self.now = int(datetime.now().timestamp())
self.channel_overwrites: dict | None = None
def run(self):
"""run all functions"""
self.channel_overwrites = self.get_channel_overwrites()
self.auto_delete_all()
self.auto_delete_overwrites()
to_refresh = self.refresh_playlist()
self.match_videos(to_refresh)
self.get_comments()
def get_channel_overwrites(self):
"""get overwrites"""
data = {
"query": {
"bool": {"must": [{"exists": {"field": "channel_overwrites"}}]}
},
"_source": ["channel_id", "channel_overwrites"],
}
result = IndexPaginate("ta_channel", data).get_results()
overwrites = {i["channel_id"]: i["channel_overwrites"] for i in result}
return overwrites
def auto_delete_all(self):
"""handle auto delete"""
autodelete_days = self.download.config["downloads"]["autodelete_days"]
if not autodelete_days:
return
print(f"auto delete older than {autodelete_days} days")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
data = {
"query": {"range": {"player.watched_date": {"lte": now_lte}}},
"sort": [{"player.watched_date": {"order": "asc"}}],
}
self._auto_delete_watched(data)
def auto_delete_overwrites(self):
"""handle per channel auto delete from overwrites"""
for channel_id, value in self.channel_overwrites.items():
if "autodelete_days" in value:
autodelete_days = value.get("autodelete_days")
print(f"{channel_id}: delete older than {autodelete_days}d")
now_lte = str(self.now - autodelete_days * 24 * 60 * 60)
must_list = [
{"range": {"player.watched_date": {"lte": now_lte}}},
{"term": {"channel.channel_id": {"value": channel_id}}},
]
data = {
"query": {"bool": {"must": must_list}},
"sort": [{"player.watched_date": {"order": "desc"}}],
}
self._auto_delete_watched(data)
@staticmethod
def _auto_delete_watched(data):
"""delete watched videos after x days"""
to_delete = IndexPaginate("ta_video", data).get_results()
if not to_delete:
return
for video in to_delete:
youtube_id = video["youtube_id"]
print(f"{youtube_id}: auto delete video")
YoutubeVideo(youtube_id).delete_media_file()
print("add deleted to ignore list")
vids = [{"type": "video", "url": i["youtube_id"]} for i in to_delete]
pending = PendingList(youtube_ids=vids)
pending.parse_url_list()
_ = pending.add_to_pending(status="ignore")
def refresh_playlist(self) -> list[str]:
"""match videos with playlists"""
to_refresh = self._get_to_refresh_playlists()
total_playlist = len(to_refresh)
for idx, playlist_id in enumerate(to_refresh):
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist(skip_on_empty=True)
if self.download.task:
continue
channel_name = playlist.json_data["playlist_channel"]
playlist_title = playlist.json_data["playlist_name"]
message = [
f"Post Processing Playlists for: {channel_name}",
f"Validate: {playlist_title} - {idx + 1}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
return to_refresh
def _get_to_refresh_playlists(self) -> list[str]:
"""get playlists to refresh"""
if self.download.task:
message = ["Post Processing Playlists", "Scanning for Playlists"]
self.download.task.send_progress(message)
to_refresh = []
for channel_id in self.download.channels:
channel = YoutubeChannel(channel_id)
channel.get_from_es()
overwrites = channel.get_overwrites()
if overwrites and overwrites.get("index_playlists"):
channel.get_all_playlists()
to_refresh.extend(channel.all_playlists)
subs = PlaylistSubscription().get_playlists()
for playlist in subs:
playlist_id = playlist["playlist_id"]
if playlist_id not in to_refresh:
to_refresh.append(playlist_id)
return to_refresh
def match_videos(self, to_refresh: list[str]) -> None:
"""scan rest of indexed playlists to match videos"""
must_not = [{"terms": {"playlist_id": to_refresh}}]
video_ids = list(self.download.videos)
must = [{"terms": {"playlist_entries.youtube_id": video_ids}}]
data = {
"query": {"bool": {"must_not": must_not, "must": must}},
"_source": ["playlist_id"],
}
playlists = IndexPaginate("ta_playlist", data).get_results()
total_playlist = len(playlists)
for idx, to_match in enumerate(playlists):
playlist_id = to_match["playlist_id"]
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
playlist.add_vids_to_playlist()
playlist.remove_vids_from_playlist()
if not self.download.task:
continue
message = [
"Post Processing Playlists.",
f"Validate Playlists: - {idx + 1}/{total_playlist}",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
def get_comments(self):
"""get comments from youtube"""
CommentList(self.download.videos, task=self.download.task).index()

@ -8,7 +8,6 @@ import json
import os
from datetime import datetime
from home.src.download import queue # partial import
from home.src.download.thumbnails import ThumbManager
from home.src.download.yt_dlp_base import YtWrap
from home.src.es.connect import ElasticWrap, IndexPaginate
@ -267,13 +266,12 @@ class YoutubeChannel(YouTubeItem):
print(f"{self.youtube_id}: no playlists found.")
return
all_youtube_ids = self.get_all_video_ids()
total = len(self.all_playlists)
for idx, playlist in enumerate(self.all_playlists):
if self.task:
self._notify_single_playlist(idx, total)
self._index_single_playlist(playlist, all_youtube_ids)
self._index_single_playlist(playlist)
print("add playlist: " + playlist[1])
def _notify_single_playlist(self, idx, total):
@ -286,32 +284,10 @@ class YoutubeChannel(YouTubeItem):
self.task.send_progress(message, progress=(idx + 1) / total)
@staticmethod
def _index_single_playlist(playlist, all_youtube_ids):
def _index_single_playlist(playlist):
"""add single playlist if needed"""
playlist = YoutubePlaylist(playlist[0])
playlist.all_youtube_ids = all_youtube_ids
playlist.build_json()
if not playlist.json_data:
return
entries = playlist.json_data["playlist_entries"]
downloaded = [i for i in entries if i["downloaded"]]
if not downloaded:
return
playlist.upload_to_es()
playlist.add_vids_to_playlist()
playlist.get_playlist_art()
@staticmethod
def get_all_video_ids():
"""match all playlists with videos"""
handler = queue.PendingList()
handler.get_download()
handler.get_indexed()
all_youtube_ids = [i["youtube_id"] for i in handler.all_videos]
return all_youtube_ids
playlist.update_playlist(skip_on_empty=True)
def get_channel_videos(self):
"""get all videos from channel"""

@ -8,7 +8,7 @@ import json
from datetime import datetime
from home.src.download.thumbnails import ThumbManager
from home.src.es.connect import ElasticWrap
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.index.generic import YouTubeItem
from home.src.index.video import YoutubeVideo
@ -28,7 +28,6 @@ class YoutubePlaylist(YouTubeItem):
super().__init__(youtube_id)
self.all_members = False
self.nav = False
self.all_youtube_ids = []
def build_json(self, scrape=False):
"""collection to create json_data"""
@ -45,7 +44,8 @@ class YoutubePlaylist(YouTubeItem):
return
self.process_youtube_meta()
self.get_entries()
ids_found = self.get_local_vids()
self.get_entries(ids_found)
self.json_data["playlist_entries"] = self.all_members
self.json_data["playlist_subscribed"] = subscribed
@ -69,25 +69,31 @@ class YoutubePlaylist(YouTubeItem):
"playlist_type": "regular",
}
def get_entries(self, playlistend=False):
"""get all videos in playlist"""
if playlistend:
# implement playlist end
print(playlistend)
def get_local_vids(self) -> list[str]:
"""get local video ids from youtube entries"""
entries = self.youtube_meta["entries"]
data = {
"query": {"terms": {"youtube_id": [i["id"] for i in entries]}},
"_source": ["youtube_id"],
}
indexed_vids = IndexPaginate("ta_video", data).get_results()
ids_found = [i["youtube_id"] for i in indexed_vids]
return ids_found
def get_entries(self, ids_found) -> None:
"""get all videos in playlist, match downloaded with ids_found"""
all_members = []
for idx, entry in enumerate(self.youtube_meta["entries"]):
if self.all_youtube_ids:
downloaded = entry["id"] in self.all_youtube_ids
else:
downloaded = False
if not entry["channel"]:
continue
to_append = {
"youtube_id": entry["id"],
"title": entry["title"],
"uploader": entry["channel"],
"idx": idx,
"downloaded": downloaded,
"downloaded": entry["id"] in ids_found,
}
all_members.append(to_append)
@ -128,17 +134,50 @@ class YoutubePlaylist(YouTubeItem):
ElasticWrap("_bulk").post(query_str, ndjson=True)
def update_playlist(self):
def remove_vids_from_playlist(self):
"""remove playlist ids from videos if needed"""
needed = [i["youtube_id"] for i in self.json_data["playlist_entries"]]
data = {
"query": {"match": {"playlist": self.youtube_id}},
"_source": ["youtube_id"],
}
result = IndexPaginate("ta_video", data).get_results()
to_remove = [
i["youtube_id"] for i in result if i["youtube_id"] not in needed
]
s = "ctx._source.playlist.removeAll(Collections.singleton(params.rm))"
for video_id in to_remove:
query = {
"script": {
"source": s,
"lang": "painless",
"params": {"rm": self.youtube_id},
},
"query": {"match": {"youtube_id": video_id}},
}
path = "ta_video/_update_by_query"
_, status_code = ElasticWrap(path).post(query)
if status_code == 200:
print(f"{self.youtube_id}: removed {video_id} from playlist")
def update_playlist(self, skip_on_empty=False):
"""update metadata for playlist with data from YouTube"""
self.get_from_es()
subscribed = self.json_data["playlist_subscribed"]
self.get_from_youtube()
self.build_json(scrape=True)
if not self.json_data:
# return false to deactivate
return False
self.json_data["playlist_subscribed"] = subscribed
if skip_on_empty:
has_item_downloaded = next(
i["downloaded"] for i in self.json_data["playlist_entries"]
)
if not has_item_downloaded:
return True
self.upload_to_es()
self.add_vids_to_playlist()
self.remove_vids_from_playlist()
self.get_playlist_art()
return True
def build_nav(self, youtube_id):

@ -10,7 +10,6 @@ from datetime import datetime
from time import sleep
from home.models import CustomPeriodicTask
from home.src.download.queue import PendingList
from home.src.download.subscriptions import ChannelSubscription
from home.src.download.thumbnails import ThumbManager
from home.src.download.yt_dlp_base import CookieHandler
@ -243,7 +242,6 @@ class Reindex(ReindexBase):
def __init__(self, task=False):
super().__init__()
self.task = task
self.all_indexed_ids = False
self.processed = {
"videos": 0,
"channels": 0,
@ -374,7 +372,6 @@ class Reindex(ReindexBase):
def _reindex_single_playlist(self, playlist_id):
"""refresh playlist data"""
self._get_all_videos()
playlist = YoutubePlaylist(playlist_id)
playlist.get_from_es()
if (
@ -383,28 +380,14 @@ class Reindex(ReindexBase):
):
return
subscribed = playlist.json_data["playlist_subscribed"]
playlist.all_youtube_ids = self.all_indexed_ids
playlist.build_json(scrape=True)
if not playlist.json_data:
is_active = playlist.update_playlist()
if not is_active:
playlist.deactivate()
return
playlist.json_data["playlist_subscribed"] = subscribed
playlist.upload_to_es()
self.processed["playlists"] += 1
return
def _get_all_videos(self):
"""add all videos for playlist index validation"""
if self.all_indexed_ids:
return
handler = PendingList()
handler.get_download()
handler.get_indexed()
self.all_indexed_ids = [i["youtube_id"] for i in handler.all_videos]
def cookie_is_valid(self):
"""return true if cookie is enabled and valid"""
if not self.config["downloads"]["cookie_import"]:

@ -12,6 +12,7 @@ from datetime import datetime
from urllib.parse import urlparse
import requests
from home.src.es.connect import IndexPaginate
from home.src.ta.settings import EnvironmentSettings
@ -222,3 +223,21 @@ def check_stylesheet(stylesheet: str):
return stylesheet
return "dark.css"
def is_missing(
to_check: str | list[str], index_name: str = "ta_video,ta_download"
) -> list[str]:
"""id or list of ids that are missing from index_name"""
if isinstance(to_check, str):
to_check = [to_check]
data = {
"query": {"terms": {"youtube_id": to_check}},
"_source": ["youtube_id"],
}
result = IndexPaginate(index_name, data=data).get_results()
existing_ids = [i["youtube_id"] for i in result]
dl = [i for i in to_check if i not in existing_ids]
return dl

Loading…
Cancel
Save