Compare commits

...

5 Commits

@ -11,6 +11,7 @@ from time import sleep
import requests
from home.src.es.connect import ElasticWrap, IndexPaginate
from home.src.ta.helper import is_missing
from home.src.ta.settings import EnvironmentSettings
from mutagen.mp4 import MP4, MP4Cover
from PIL import Image, ImageFile, ImageFilter, UnidentifiedImageError
@ -326,7 +327,7 @@ class ThumbValidator:
},
]
def __init__(self, task):
def __init__(self, task=False):
self.task = task
def validate(self):
@ -346,6 +347,89 @@ class ThumbValidator:
)
_ = paginate.get_results()
def clean_up(self):
"""clean up all thumbs"""
self._clean_up_vids()
self._clean_up_channels()
self._clean_up_playlists()
def _clean_up_vids(self):
"""clean unneeded vid thumbs"""
video_dir = os.path.join(EnvironmentSettings.CACHE_DIR, "videos")
video_folders = os.listdir(video_dir)
for video_folder in video_folders:
folder_path = os.path.join(video_dir, video_folder)
thumbs_is = {i.split(".")[0] for i in os.listdir(folder_path)}
thumbs_should = self._get_vid_thumbs_should(video_folder)
to_delete = thumbs_is - thumbs_should
for thumb in to_delete:
delete_path = os.path.join(folder_path, f"{thumb}.jpg")
os.remove(delete_path)
if to_delete:
message = (
f"[thumbs][video][{video_folder}] "
+ f"delete {len(to_delete)} unused thumbnails"
)
print(message)
if self.task:
self.task.send_progress([message])
@staticmethod
def _get_vid_thumbs_should(video_folder: str) -> set[str]:
"""get indexed"""
should_list = [
{"prefix": {"youtube_id": {"value": video_folder.lower()}}},
{"prefix": {"youtube_id": {"value": video_folder.upper()}}},
]
data = {
"query": {"bool": {"should": should_list}},
"_source": ["youtube_id"],
}
result = IndexPaginate("ta_video", data).get_results()
thumbs_should = {i["youtube_id"] for i in result}
return thumbs_should
def _clean_up_channels(self):
"""clean unneeded channel thumbs"""
channel_dir = os.path.join(EnvironmentSettings.CACHE_DIR, "channels")
channel_art = os.listdir(channel_dir)
thumbs_is = {"_".join(i.split("_")[:-1]) for i in channel_art}
to_delete = is_missing(list(thumbs_is), "ta_channel", "channel_id")
for channel_thumb in channel_art:
if channel_thumb[:24] in to_delete:
delete_path = os.path.join(channel_dir, channel_thumb)
os.remove(delete_path)
if to_delete:
message = (
"[thumbs][channel] "
+ f"delete {len(to_delete)} unused channel art"
)
print(message)
if self.task:
self.task.send_progress([message])
def _clean_up_playlists(self):
"""clean up unneeded playlist thumbs"""
playlist_dir = os.path.join(EnvironmentSettings.CACHE_DIR, "playlists")
playlist_art = os.listdir(playlist_dir)
thumbs_is = {i.split(".")[0] for i in playlist_art}
to_delete = is_missing(list(thumbs_is), "ta_playlist", "playlist_id")
for playlist_id in to_delete:
delete_path = os.path.join(playlist_dir, f"{playlist_id}.jpg")
os.remove(delete_path)
if to_delete:
message = (
"[thumbs][playlist] "
+ f"delete {len(to_delete)} unused playlist art"
)
print(message)
if self.task:
self.task.send_progress([message])
@staticmethod
def _get_total(index_name):
"""get total documents in index"""

@ -30,42 +30,38 @@ class VideoDownloader:
if not initiated with list, take from queue
"""
def __init__(self, youtube_id_list=False, task=False):
CACHE_DIR = EnvironmentSettings.CACHE_DIR
MEDIA_DIR = EnvironmentSettings.MEDIA_DIR
def __init__(self, task=False):
self.obs = False
self.video_overwrites = False
self.youtube_id_list = youtube_id_list
self.channel_overwrites = get_channel_overwrites()
self.task = task
self.config = AppConfig().config
self.cache_dir = EnvironmentSettings.CACHE_DIR
self.media_dir = EnvironmentSettings.MEDIA_DIR
self._build_obs()
self.channels = set()
self.videos = set()
def run_queue(self, auto_only=False):
"""setup download queue in redis loop until no more items"""
self._get_overwrites()
while True:
video_data = self._get_next(auto_only)
if self.task.is_stopped() or not video_data:
self._reset_auto()
break
youtube_id = video_data.get("youtube_id")
youtube_id = video_data["youtube_id"]
channel_id = video_data["channel_id"]
print(f"{youtube_id}: Downloading video")
self._notify(video_data, "Validate download format")
success = self._dl_single_vid(youtube_id)
success = self._dl_single_vid(youtube_id, channel_id)
if not success:
continue
self._notify(video_data, "Add video metadata to index", progress=1)
vid_dict = index_new_video(
youtube_id,
video_overwrites=self.video_overwrites,
video_type=VideoTypeEnum(video_data["vid_type"]),
)
video_type = VideoTypeEnum(video_data["vid_type"])
vid_dict = index_new_video(youtube_id, video_type=video_type)
self.channels.add(vid_dict["channel"]["channel_id"])
self.videos.add(vid_dict["youtube_id"])
@ -112,13 +108,6 @@ class VideoDownloader:
return response["hits"]["hits"][0]["_source"]
def _get_overwrites(self):
"""get channel overwrites"""
pending = PendingList()
pending.get_download()
pending.get_channels()
self.video_overwrites = pending.video_overwrites
def _progress_hook(self, response):
"""process the progress_hooks from yt_dlp"""
progress = False
@ -149,7 +138,7 @@ class VideoDownloader:
"""initial obs"""
self.obs = {
"merge_output_format": "mp4",
"outtmpl": (self.cache_dir + "/download/%(id)s.mp4"),
"outtmpl": (self.CACHE_DIR + "/download/%(id)s.mp4"),
"progress_hooks": [self._progress_hook],
"noprogress": True,
"continuedl": True,
@ -209,22 +198,17 @@ class VideoDownloader:
self.obs["postprocessors"] = postprocessors
def get_format_overwrites(self, youtube_id):
"""get overwrites from single video"""
overwrites = self.video_overwrites.get(youtube_id, False)
if overwrites:
return overwrites.get("download_format", False)
return False
def _set_overwrites(self, obs: dict, channel_id: str) -> None:
"""add overwrites to obs"""
overwrites = self.channel_overwrites.get(channel_id)
if overwrites and overwrites.get("download_format"):
obs["format"] = overwrites.get("download_format")
def _dl_single_vid(self, youtube_id):
def _dl_single_vid(self, youtube_id: str, channel_id: str) -> bool:
"""download single video"""
obs = self.obs.copy()
format_overwrite = self.get_format_overwrites(youtube_id)
if format_overwrite:
obs["format"] = format_overwrite
dl_cache = self.cache_dir + "/download/"
self._set_overwrites(obs, channel_id)
dl_cache = os.path.join(self.CACHE_DIR, "download")
# check if already in cache to continue from there
all_cached = ignore_filelist(os.listdir(dl_cache))
@ -258,7 +242,7 @@ class VideoDownloader:
host_gid = EnvironmentSettings.HOST_GID
# make folder
folder = os.path.join(
self.media_dir, vid_dict["channel"]["channel_id"]
self.MEDIA_DIR, vid_dict["channel"]["channel_id"]
)
if not os.path.exists(folder):
os.makedirs(folder)
@ -266,8 +250,8 @@ class VideoDownloader:
os.chown(folder, host_uid, host_gid)
# move media file
media_file = vid_dict["youtube_id"] + ".mp4"
old_path = os.path.join(self.cache_dir, "download", media_file)
new_path = os.path.join(self.media_dir, vid_dict["media_url"])
old_path = os.path.join(self.CACHE_DIR, "download", media_file)
new_path = os.path.join(self.MEDIA_DIR, vid_dict["media_url"])
# move media file and fix permission
shutil.move(old_path, new_path, copy_function=shutil.copyfile)
if host_uid and host_gid:
@ -382,14 +366,14 @@ class DownloadPostProcess:
playlist = YoutubePlaylist(playlist_id)
playlist.update_playlist(skip_on_empty=True)
if self.download.task:
if not self.download.task:
continue
channel_name = playlist.json_data["playlist_channel"]
playlist_title = playlist.json_data["playlist_name"]
message = [
f"Post Processing Playlists for: {channel_name}",
f"Validate: {playlist_title} - {idx + 1}/{total_playlist}",
f"{playlist_title} [{idx + 1}/{total_playlist}]",
]
progress = (idx + 1) / total_playlist
self.download.task.send_progress(message, progress=progress)
@ -407,9 +391,9 @@ class DownloadPostProcess:
channel = YoutubeChannel(channel_id)
channel.get_from_es()
overwrites = channel.get_overwrites()
if overwrites and overwrites.get("index_playlists"):
if "index_playlists" in overwrites:
channel.get_all_playlists()
to_refresh.extend(channel.all_playlists)
to_refresh.extend([i[0] for i in channel.all_playlists])
subs = PlaylistSubscription().get_playlists()
for playlist in subs:

@ -225,9 +225,7 @@ class YoutubeChannel(YouTubeItem):
"""delete all indexed playlist from es"""
all_playlists = self.get_indexed_playlists()
for playlist in all_playlists:
playlist_id = playlist["playlist_id"]
playlist = YoutubePlaylist(playlist_id)
YoutubePlaylist(playlist_id).delete_metadata()
YoutubePlaylist(playlist["playlist_id"]).delete_metadata()
def delete_channel(self):
"""delete channel and all videos"""
@ -324,9 +322,9 @@ class YoutubeChannel(YouTubeItem):
all_playlists = IndexPaginate("ta_playlist", data).get_results()
return all_playlists
def get_overwrites(self):
def get_overwrites(self) -> dict:
"""get all per channel overwrites"""
return self.json_data.get("channel_overwrites", False)
return self.json_data.get("channel_overwrites", {})
def set_overwrites(self, overwrites):
"""set per channel overwrites"""

@ -168,7 +168,7 @@ class YoutubePlaylist(YouTubeItem):
return False
if skip_on_empty:
has_item_downloaded = next(
has_item_downloaded = any(
i["downloaded"] for i in self.json_data["playlist_entries"]
)
if not has_item_downloaded:

@ -125,15 +125,9 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
index_name = "ta_video"
yt_base = "https://www.youtube.com/watch?v="
def __init__(
self,
youtube_id,
video_overwrites=False,
video_type=VideoTypeEnum.VIDEOS,
):
def __init__(self, youtube_id, video_type=VideoTypeEnum.VIDEOS):
super().__init__(youtube_id)
self.channel_id = False
self.video_overwrites = video_overwrites
self.video_type = video_type
self.offline_import = False
@ -165,13 +159,12 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
"""check if need to run sponsor block"""
integrate = self.config["downloads"]["integrate_sponsorblock"]
if self.video_overwrites:
single_overwrite = self.video_overwrites.get(self.youtube_id)
if not single_overwrite:
if overwrite := self.json_data["channel"].get("channel_overwrites"):
if not overwrite:
return integrate
if "integrate_sponsorblock" in single_overwrite:
return single_overwrite.get("integrate_sponsorblock")
if "integrate_sponsorblock" in overwrite:
return overwrite.get("integrate_sponsorblock")
return integrate
@ -399,13 +392,9 @@ class YoutubeVideo(YouTubeItem, YoutubeSubtitle):
_, _ = ElasticWrap(path).post(data=data)
def index_new_video(
youtube_id, video_overwrites=False, video_type=VideoTypeEnum.VIDEOS
):
def index_new_video(youtube_id, video_type=VideoTypeEnum.VIDEOS):
"""combined classes to create new video in index"""
video = YoutubeVideo(
youtube_id, video_overwrites=video_overwrites, video_type=video_type
)
video = YoutubeVideo(youtube_id, video_type=video_type)
video.build_json()
if not video.json_data:
raise ValueError("failed to get metadata for " + youtube_id)

@ -227,18 +227,20 @@ def check_stylesheet(stylesheet: str):
def is_missing(
to_check: str | list[str], index_name: str = "ta_video,ta_download"
to_check: str | list[str],
index_name: str = "ta_video,ta_download",
on_key: str = "youtube_id",
) -> list[str]:
"""id or list of ids that are missing from index_name"""
if isinstance(to_check, str):
to_check = [to_check]
data = {
"query": {"terms": {"youtube_id": to_check}},
"_source": ["youtube_id"],
"query": {"terms": {on_key: to_check}},
"_source": [on_key],
}
result = IndexPaginate(index_name, data=data).get_results()
existing_ids = [i["youtube_id"] for i in result]
existing_ids = [i[on_key] for i in result]
dl = [i for i in to_check if i not in existing_ids]
return dl

@ -259,7 +259,9 @@ def thumbnail_check(self):
return
manager.init(self)
ThumbValidator(task=self).validate()
thumnail = ThumbValidator(task=self)
thumnail.validate()
thumnail.clean_up()
@shared_task(bind=True, name="resync_thumbs", base=BaseTask)

Loading…
Cancel
Save