You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
catcli/catcli/noder.py

743 lines
26 KiB
Python

7 years ago
"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
1 year ago
Class that process nodes in the catalog tree
7 years ago
"""
import os
import shutil
7 years ago
import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import fnmatch
4 months ago
import anytree
from natsort import os_sort_keygen
7 years ago
# local imports
1 year ago
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
10 months ago
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
3 months ago
from catcli.utils import md5sum
7 years ago
from catcli.logger import Logger
4 months ago
from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
from catcli.decomp import Decomp
2 years ago
from catcli.version import __version__ as VERSION
2 years ago
from catcli.exceptions import CatcliException
7 years ago
2 years ago
class Noder:
"""
handles node in the catalog tree
There are 4 types of node:
7 years ago
* "top" node representing the top node (generic node)
* "storage" node representing a storage
* "dir" node representing a directory
* "file" node representing a file
2 years ago
"""
# pylint: disable=R0904
7 years ago
1 year ago
def __init__(self, debug: bool = False,
sortsize: bool = False,
arc: bool = False) -> None:
2 years ago
"""
4 years ago
@debug: debug mode
@sortsize: sort nodes by size
@arch: handle archive
2 years ago
"""
7 years ago
self.hash = True
self.debug = debug
self.sortsize = sortsize
self.arc = arc
if self.arc:
self.decomp = Decomp()
4 months ago
self.csv_printer = CsvPrinter()
self.native_printer = NativePrinter()
7 years ago
2 years ago
@staticmethod
def get_storage_names(top: NodeTop) -> List[str]:
2 years ago
"""return a list of all storage names"""
7 years ago
return [x.name for x in list(top.children)]
4 months ago
def find_storage_node_by_name(self, top: NodeTop,
name: str) -> Optional[NodeStorage]:
"""find a storage node by name"""
2 years ago
for node in top.children:
1 year ago
if node.type != nodes.TYPE_STORAGE:
continue
2 years ago
if node.name == name:
4 months ago
return cast(NodeStorage, node)
return None
def update_storage_path(self, top: NodeTop,
name: str,
newpath: str) -> None:
"""find and update storage path on update"""
storage = self.find_storage_node_by_name(top, name)
if storage and newpath and os.path.exists(newpath):
storage.free = shutil.disk_usage(newpath).free
storage.total = shutil.disk_usage(newpath).total
storage.ts = int(time.time())
2 years ago
@staticmethod
def get_node(top: NodeTop,
path: str,
quiet: bool = False) -> Optional[NodeAny]:
2 years ago
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
4 months ago
bpath = ''
try:
2 years ago
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
10 months ago
typcast_node(the_node)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
2 years ago
Logger.err(f'No node at path \"{bpath}\"')
return None
1 year ago
def get_node_if_changed(self,
top: NodeTop,
1 year ago
path: str,
treepath: str) -> Tuple[Optional[NodeAny], bool]:
2 years ago
"""
return the node (if any) and if it has changed
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
2 years ago
"""
treepath = treepath.lstrip(os.sep)
node = self.get_node(top, treepath, quiet=True)
# node does not exist
if not node:
self._debug('\tchange: node does not exist')
return None, True
if os.path.isdir(path):
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
3 months ago
if not node.has_attr('maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
# maccess changed
old_maccess = node.maccess
if float(maccess) != float(old_maccess):
self._debug(f'\tchange: maccess changed for \"{path}\"')
return node, True
# test hash
if self.hash and node.md5:
md5 = self._get_hash(path)
2 years ago
if md5 and md5 != node.md5:
2 years ago
msg = f'\tchange: checksum changed for \"{path}\"'
self._debug(msg)
return node, True
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
6 years ago
###############################################################
# public helpers
###############################################################
2 years ago
@staticmethod
def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
2 years ago
"""format the storage attr for saving"""
6 years ago
if not attr:
return ''
2 years ago
if isinstance(attr, list):
3 years ago
return ', '.join(attr)
if isinstance(attr, dict):
ret = []
for key, val in attr.items():
ret.append(f'{key}={val}')
return ', '.join(ret)
3 years ago
attr = attr.rstrip()
return attr
6 years ago
def do_hashing(self, val: bool) -> None:
2 years ago
"""hash files when indexing"""
6 years ago
self.hash = val
7 years ago
###############################################################
2 years ago
# node creation
7 years ago
###############################################################
def new_top_node(self) -> NodeTop:
2 years ago
"""create a new top node"""
1 year ago
top = NodeTop(nodes.NAME_TOP)
self._debug(f'new top node: {top}')
return top
7 years ago
1 year ago
def new_file_node(self, name: str, path: str,
4 months ago
parent: NodeAny) -> Optional[NodeFile]:
2 years ago
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
return None
7 years ago
path = os.path.abspath(path)
try:
2 years ago
stat = os.lstat(path)
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
1 year ago
md5 = ''
7 years ago
if self.hash:
md5 = self._get_hash(path)
maccess = os.path.getmtime(path)
node = NodeFile(name,
stat.st_size,
md5,
maccess,
parent=parent)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
self._debug(f'{path} is an archive')
names = self.decomp.get_names(path)
2 years ago
self.list_to_tree(node, names)
else:
self._debug(f'{path} is NOT an archive')
2 years ago
return node
7 years ago
1 year ago
def new_dir_node(self, name: str, path: str,
4 months ago
parent: NodeAny) -> NodeDir:
2 years ago
"""create a new node representing a directory"""
7 years ago
path = os.path.abspath(path)
maccess = os.path.getmtime(path)
return NodeDir(name,
0,
maccess,
parent=parent)
2 years ago
1 year ago
def new_storage_node(self, name: str,
path: str,
parent: str,
attrs: Dict[str, Any]) \
-> NodeStorage:
2 years ago
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return NodeStorage(name,
free,
total,
0,
epoch,
1 year ago
self.attrs_to_string(attrs),
parent=parent)
2 years ago
def new_archive_node(self,
name: str,
parent: str,
archive: str) -> NodeArchived:
2 years ago
"""create a new node for archive data"""
return NodeArchived(name=name,
parent=parent, nodesize=0, md5='',
archive=archive)
2 years ago
###############################################################
# node management
###############################################################
def update_metanode(self, top: NodeTop) -> NodeMeta:
2 years ago
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attrs: Dict[str, Any] = {}
attrs['created'] = epoch
attrs['created_version'] = VERSION
1 year ago
meta = NodeMeta(name=nodes.NAME_META,
attr=attrs)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
2 years ago
return meta
def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
2 years ago
"""return the meta node if any"""
try:
1 year ago
found = next(filter(lambda x: x.type == nodes.TYPE_META,
top.children))
return cast(NodeMeta, found)
2 years ago
except StopIteration:
return None
7 years ago
def clean_not_flagged(self, top: NodeTop) -> int:
2 years ago
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
10 months ago
typcast_node(node)
1 year ago
if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
continue
if self._clean(node):
cnt += 1
return cnt
def _clean(self, node: NodeAny) -> bool:
2 years ago
"""remove node if not flagged"""
if not node.flagged():
node.parent = None
return True
node.unflag()
return False
7 years ago
###############################################################
# printing
###############################################################
4 months ago
def _print_node_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
2 years ago
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
2 years ago
"""
typcast_node(node)
if not node:
2 years ago
return
1 year ago
if node.type == nodes.TYPE_TOP:
2 years ago
return
1 year ago
if node.type == nodes.TYPE_STORAGE:
4 months ago
self.csv_printer.print_storage(node,
sep=sep,
raw=raw)
else:
4 months ago
self.csv_printer.print_node(node,
sep=sep,
raw=raw)
4 months ago
def _print_node_du(self, node: NodeAny,
raw: bool = False) -> None:
"""
4 months ago
print node du style
"""
4 months ago
typcast_node(node)
thenodes = self._get_entire_tree(node,
dironly=True)
for thenode in thenodes:
self.native_printer.print_du(thenode, raw=raw)
def _print_node_native(self, node: NodeAny,
1 year ago
pre: str = '',
withpath: bool = False,
withnbchildren: bool = False,
1 year ago
withstorage: bool = False,
raw: bool = False) -> None:
2 years ago
"""
print a node
@node: the node to print
@pre: string to print before node
@withpath: print the node path
@withnbchildren: print the node nb children
@withstorage: print the node storage it belongs to
@raw: print raw size rather than human readable
2 years ago
"""
typcast_node(node)
1 year ago
if node.type == nodes.TYPE_TOP:
# top node
3 months ago
self.native_printer.print_top(pre, node.get_name())
1 year ago
elif node.type == nodes.TYPE_FILE:
# node of type file
4 months ago
self.native_printer.print_file(pre, node,
withpath=withpath,
withstorage=withstorage,
raw=raw)
1 year ago
elif node.type == nodes.TYPE_DIR:
# node of type directory
4 months ago
self.native_printer.print_dir(pre,
node,
withpath=withpath,
withstorage=withstorage,
withnbchildren=withnbchildren,
raw=raw)
1 year ago
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
4 months ago
self.native_printer.print_storage(pre,
node,
raw=raw)
1 year ago
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
if self.arc:
4 months ago
self.native_printer.print_archive(pre, node.name, node.archive)
7 years ago
else:
Logger.err(f'bad node encountered: {node}')
7 years ago
def print_tree(self, node: NodeAny,
1 year ago
fmt: str = 'native',
raw: bool = False) -> None:
2 years ago
"""
print the tree in different format
@node: start node
@style: when fmt=native, defines the tree style
@fmt: output format
@raw: print the raw size rather than human readable
2 years ago
"""
if fmt == 'native':
# "tree" style
rend = anytree.RenderTree(node, childiter=self._sort_tree)
2 years ago
for pre, _, thenode in rend:
2 years ago
self._print_node_native(thenode, pre=pre,
withnbchildren=True, raw=raw)
elif fmt == 'csv':
# csv output
4 months ago
self._print_nodes_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
4 months ago
self.csv_printer.print_header()
self._print_nodes_csv(node, raw=raw)
4 months ago
def _print_nodes_csv(self, node: NodeAny,
raw: bool = False) -> None:
2 years ago
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
2 years ago
for _, _, item in rend:
4 months ago
self._print_node_csv(item, raw=raw)
2 years ago
@staticmethod
1 year ago
def _fzf_prompt(strings: Any) -> Any:
"""prompt with fzf"""
try:
4 months ago
from pyfzf.pyfzf import FzfPrompt # pylint: disable=C0415 # noqa
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
except ModuleNotFoundError:
Logger.err('install pyfzf to use fzf')
return None
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
2 years ago
fzf prompt with list and print selected node(s)
@node: node to start with
@fmt: output format for selected nodes
"""
2 years ago
rendered = anytree.RenderTree(node, childiter=self._sort_tree)
1 year ago
the_nodes = {}
# construct node names list
2 years ago
for _, _, rend in rendered:
if not rend:
continue
4 months ago
parents = rend.get_fullpath()
4 months ago
storage = rend.get_storage_node()
3 months ago
fullpath = os.path.join(storage.get_name(), parents)
1 year ago
the_nodes[fullpath] = rend
# prompt with fzf
1 year ago
paths = self._fzf_prompt(the_nodes.keys())
# print the resulting tree
subfmt = fmt.replace('fzf-', '')
for path in paths:
if not path:
continue
1 year ago
if path not in the_nodes:
continue
1 year ago
rend = the_nodes[path]
2 years ago
self.print_tree(rend, fmt=subfmt)
2 years ago
@staticmethod
def to_dot(top: NodeTop,
1 year ago
path: str = 'tree.dot') -> str:
2 years ago
"""export to dot for graphing"""
anytree.exporter.DotExporter(top).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
6 years ago
7 years ago
###############################################################
# searching
###############################################################
def find(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
2 years ago
"""
find files based on their names
@top: top node
@key: term to search for
@script: output script
@directory: only search for directories
@startpath: node to start with
@fmt: output format
@raw: raw size output
2 years ago
returns the found nodes
2 years ago
"""
self._debug(f'searching for \"{key}\"')
2 years ago
# search for nodes based on path
start: Optional[NodeAny] = top
1 year ago
if startnode:
start = self.get_node(top, startnode)
2 years ago
filterfunc = self._callback_find_name(key, only_dir)
2 years ago
found = anytree.findall(start, filter_=filterfunc)
1 year ago
self._debug(f'found {len(found)} node(s)')
# compile found nodes
2 years ago
paths = {}
for item in found:
4 months ago
typcast_node(item)
3 months ago
item.set_name(item.get_name())
4 months ago
key = item.get_fullpath()
4 months ago
paths[key] = item
2 years ago
# handle fzf mode
if fmt.startswith('fzf'):
selected = self._fzf_prompt(paths.keys())
2 years ago
newpaths = {}
subfmt = fmt.replace('fzf-', '')
2 years ago
for item in selected:
if item not in paths:
continue
2 years ago
newpaths[item] = paths[item]
2 years ago
self.print_tree(newpaths[item], fmt=subfmt)
paths = newpaths
2 years ago
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item,
withpath=True,
withnbchildren=True,
2 years ago
withstorage=True,
raw=raw)
2 years ago
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
4 months ago
self.csv_printer.print_header()
2 years ago
for _, item in paths.items():
4 months ago
self._print_node_csv(item, raw=raw)
2 years ago
# execute script if any
7 years ago
if script:
2 years ago
tmp = ['${source}/' + x for x in paths]
tmpstr = ' '.join(tmp)
cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
7 years ago
Logger.info(cmd)
2 years ago
return list(paths.values())
1 year ago
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
2 years ago
"""callback for finding files"""
def find_name(node: NodeAny) -> bool:
4 months ago
typcast_node(node)
4 months ago
path = node.get_fullpath()
1 year ago
if node.type == nodes.TYPE_STORAGE:
2 years ago
# ignore storage nodes
return False
1 year ago
if node.type == nodes.TYPE_TOP:
2 years ago
# ignore top nodes
return False
1 year ago
if node.type == nodes.TYPE_META:
2 years ago
# ignore meta nodes
return False
1 year ago
if only_dir and node.type == nodes.TYPE_DIR:
2 years ago
# ignore non directory
return False
# filter
if not term:
return True
if term in path:
return True
4 months ago
if self.debug:
Logger.debug(f'match \"{path}\" with \"{term}\"')
if fnmatch.fnmatch(path, term):
return True
2 years ago
# ignore
return False
return find_name
7 years ago
4 months ago
###############################################################
# fixsizes
###############################################################
def fixsizes(self, top: NodeTop) -> None:
4 months ago
"""fix node sizes"""
4 months ago
typcast_node(top)
rend = anytree.RenderTree(top)
for _, _, thenode in rend:
typcast_node(thenode)
thenode.nodesize = thenode.get_rec_size()
7 years ago
###############################################################
2 years ago
# ls
7 years ago
###############################################################
def list(self, top: NodeTop,
1 year ago
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
2 years ago
"""
2 years ago
list nodes for "ls"
@top: top node
@path: path to search for
@rec: recursive walk
@fmt: output format
@raw: print raw size
2 years ago
"""
3 months ago
self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"')
2 years ago
resolv = anytree.resolver.Resolver('name')
7 years ago
found = []
try:
if '*' in path or '?' in path:
# we need to handle glob
self._debug('glob ls...')
found = resolv.glob(top, path)
else:
# we have a canonical path
self._debug('get ls...')
4 months ago
foundone = resolv.get(top, path)
cast(NodeAny, foundone)
typcast_node(foundone)
if foundone and foundone.may_have_children():
# let's find its children as well
modpath = os.path.join(path, '*')
found = resolv.glob(top, modpath)
else:
4 months ago
found = [foundone]
7 years ago
if len(found) < 1:
# nothing found
self._debug('nothing found')
7 years ago
return []
7 years ago
if rec:
# print the entire tree
2 years ago
self.print_tree(found[0].parent, fmt=fmt, raw=raw)
return found
# sort found nodes
4 months ago
found = sorted(found, key=os_sort_keygen(self._sort))
# print all found nodes
if fmt == 'csv-with-header':
4 months ago
self.csv_printer.print_header()
2 years ago
for item in found:
if fmt == 'native':
4 months ago
self._print_node_native(item,
withpath=True,
withnbchildren=True,
2 years ago
raw=raw)
elif fmt.startswith('csv'):
4 months ago
self._print_node_csv(item, raw=raw)
2 years ago
elif fmt.startswith('fzf'):
2 years ago
self._to_fzf(item, fmt)
7 years ago
except anytree.resolver.ChildResolverError:
pass
return found
4 months ago
###############################################################
# du
###############################################################
4 months ago
def diskusage(self, top: NodeTop,
path: str,
raw: bool = False) -> List[NodeAny]:
"""disk usage"""
3 months ago
self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"')
4 months ago
resolv = anytree.resolver.Resolver('name')
4 months ago
found: NodeAny
4 months ago
try:
# we have a canonical path
self._debug('get du...')
found = resolv.get(top, path)
if not found:
# nothing found
self._debug('nothing found')
return []
self._debug(f'du found: {found}')
self._print_node_du(found, raw=raw)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
4 years ago
# tree creation
###############################################################
1 year ago
def _add_entry(self, name: str,
top: NodeTop,
1 year ago
resolv: Any) -> None:
2 years ago
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
3 months ago
self.new_archive_node(name, top, top.get_name())
return
sub = os.sep.join(entries[:-1])
2 years ago
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
3 months ago
parent = self.new_archive_node(nodename, parent, top.get_name())
except anytree.resolver.ChildResolverError:
3 months ago
self.new_archive_node(nodename, top, top.get_name())
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
2 years ago
"""convert list of files to a tree"""
if not names:
return
2 years ago
resolv = anytree.resolver.Resolver('name')
for name in names:
name = name.rstrip(os.sep)
2 years ago
self._add_entry(name, parent, resolv)
7 years ago
###############################################################
# diverse
###############################################################
4 months ago
def _get_entire_tree(self, start: NodeAny,
dironly: bool = False) -> List[NodeAny]:
"""
get entire tree and sort it
"""
typcast_node(start)
rend = anytree.RenderTree(start)
thenodes = []
if dironly:
for _, _, thenode in rend:
typcast_node(thenode)
if thenode.type == nodes.TYPE_DIR:
thenodes.append(thenode)
else:
4 months ago
thenodes = [x for _, _, x in rend]
4 months ago
return sorted(thenodes, key=os_sort_keygen(self._sort))
1 year ago
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
2 years ago
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
7 years ago
def _sort(self, lst: NodeAny) -> Any:
2 years ago
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
2 years ago
@staticmethod
4 months ago
def _sort_fs(node: NodeAny) -> str:
4 months ago
"""sort by name"""
# to sort by types then name
4 months ago
return str(node.name)
7 years ago
2 years ago
@staticmethod
def _sort_size(node: NodeAny) -> float:
2 years ago
"""sorting nodes by size"""
try:
if not node.nodesize:
return 0
return float(node.nodesize)
except AttributeError:
return 0
2 years ago
@staticmethod
1 year ago
def _get_hash(path: str) -> str:
"""return md5 hash of node"""
2 years ago
try:
return md5sum(path)
except CatcliException as exc:
Logger.err(str(exc))
1 year ago
return ''
1 year ago
def _debug(self, string: str) -> None:
2 years ago
"""print debug"""
if not self.debug:
return
4 years ago
Logger.debug(string)