#!/usr/bin/env python
# cardinal_pythonlib/ui.py
"""
===============================================================================
Original code copyright (C) 2009-2022 Rudolf Cardinal (rudolf@pobox.com).
This file is part of cardinal_pythonlib.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
===============================================================================
**File operations.**
"""
from contextlib import contextmanager
import fnmatch
import glob
import os
import shutil
import stat
from types import TracebackType
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple
from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler
log = get_brace_style_log_with_null_handler(__name__)
# =============================================================================
# Find or require executables
# =============================================================================
[docs]def which_with_envpath(executable: str, env: Dict[str, str]) -> str:
"""
Performs a :func:`shutil.which` command using the PATH from the specified
environment.
Reason: when you use ``run([executable, ...], env)`` and therefore
``subprocess.run([executable, ...], env=env)``, the PATH that's searched
for ``executable`` is the parent's, not the new child's -- so you have to
find the executable manually.
Args:
executable: executable to find
env: environment to fetch the PATH variable from
"""
oldpath = os.environ.get("PATH", "")
os.environ["PATH"] = env.get("PATH")
which = shutil.which(executable)
os.environ["PATH"] = oldpath
return which
_MISSING_COMMAND = "Missing command (must be on the PATH): "
[docs]def require_executable(executable: str) -> None:
"""
If ``executable`` is not found by :func:`shutil.which`, raise
:exc:`FileNotFoundError`.
"""
if shutil.which(executable):
return
errmsg = _MISSING_COMMAND + executable
log.critical(errmsg)
raise FileNotFoundError(errmsg)
[docs]def which_and_require(executable: str, fullpath: bool = False) -> str:
"""
Ensures that ``executable`` is on the path, and returns it (or its full
path via :func:`shutil.which`).
"""
w = shutil.which(executable)
if w:
return w if fullpath else executable
errmsg = _MISSING_COMMAND + executable
log.critical(errmsg)
raise FileNotFoundError(errmsg)
# =============================================================================
# Create directories
# =============================================================================
[docs]def mkdir_p(path: str) -> None:
"""
Makes a directory, and any intermediate (parent) directories if required.
This is the UNIX ``mkdir -p DIRECTORY`` command; of course, we use
:func:`os.makedirs` instead, for portability.
"""
log.debug("mkdir -p " + path)
os.makedirs(path, exist_ok=True)
# =============================================================================
# Change directories
# =============================================================================
[docs]@contextmanager
def pushd(directory: str) -> None:
"""
Context manager: changes directory and preserves the original on exit.
Example:
.. code-block:: python
with pushd(new_directory):
# do things
"""
previous_dir = os.getcwd()
os.chdir(directory)
yield
os.chdir(previous_dir)
[docs]def preserve_cwd(func: Callable) -> Callable:
"""
Decorator to preserve the current working directory in calls to the
decorated function.
Example:
.. code-block:: python
@preserve_cwd
def myfunc():
os.chdir("/faraway")
os.chdir("/home")
myfunc()
assert os.getcwd() == "/home"
"""
# https://stackoverflow.com/questions/169070/python-how-do-i-write-a-decorator-that-restores-the-cwd # noqa: E501
def decorator(*args_, **kwargs) -> Any:
cwd = os.getcwd()
result = func(*args_, **kwargs)
os.chdir(cwd)
return result
return decorator
[docs]def root_path() -> str:
"""
Returns the system root directory.
"""
# https://stackoverflow.com/questions/12041525
return os.path.abspath(os.sep)
# =============================================================================
# Copy or move things
# =============================================================================
[docs]def copyglob(
src: str,
dest: str,
allow_nothing: bool = False,
allow_nonfiles: bool = False,
) -> None:
"""
Copies files whose filenames match the glob src" into the directory
"dest". Raises an error if no files are copied, unless allow_nothing is
True.
Args:
src: source glob (e.g. ``/somewhere/*.txt``)
dest: destination directory
allow_nothing: don't raise an exception if no files are found
allow_nonfiles: copy things that are not files too (as judged by
:func:`os.path.isfile`).
Raises:
ValueError: if no files are found and ``allow_nothing`` is not set
"""
something = False
for filename in glob.glob(src):
if allow_nonfiles or os.path.isfile(filename):
shutil.copy(filename, dest)
something = True
if something or allow_nothing:
return
raise ValueError(f"No files found matching: {src}")
[docs]def moveglob(
src: str,
dest: str,
allow_nothing: bool = False,
allow_nonfiles: bool = False,
) -> None:
"""
As for :func:`copyglob`, but moves instead.
"""
something = False
for filename in glob.glob(src):
if allow_nonfiles or os.path.isfile(filename):
shutil.move(filename, dest)
something = True
if something or allow_nothing:
return
raise ValueError(f"No files found matching: {src}")
[docs]def copy_tree_root(src_dir: str, dest_parent: str) -> None:
"""
Copies a directory ``src_dir`` into the directory ``dest_parent``.
That is, with a file structure like:
.. code-block:: none
/source/thing/a.txt
/source/thing/b.txt
/source/thing/somedir/c.txt
the command
.. code-block:: python
copy_tree_root("/source/thing", "/dest")
ends up creating
.. code-block:: none
/dest/thing/a.txt
/dest/thing/b.txt
/dest/thing/somedir/c.txt
"""
dirname = os.path.basename(os.path.normpath(src_dir))
dest_dir = os.path.join(dest_parent, dirname)
shutil.copytree(src_dir, dest_dir)
[docs]def copy_tree_contents(
srcdir: str, destdir: str, destroy: bool = False
) -> None:
"""
Recursive copy. Unlike :func:`copy_tree_root`, :func:`copy_tree_contents`
works as follows. With the file structure:
.. code-block:: none
/source/thing/a.txt
/source/thing/b.txt
/source/thing/somedir/c.txt
the command
.. code-block:: python
copy_tree_contents("/source/thing", "/dest")
ends up creating:
.. code-block:: none
/dest/a.txt
/dest/b.txt
/dest/somedir/c.txt
"""
log.info("Copying directory {} -> {}", srcdir, destdir)
if os.path.exists(destdir):
if not destroy:
raise ValueError("Destination exists!")
if not os.path.isdir(destdir):
raise ValueError("Destination exists but isn't a directory!")
log.debug("... removing old contents")
rmtree(destdir)
log.debug("... now copying")
shutil.copytree(srcdir, destdir)
# =============================================================================
# Delete things
# =============================================================================
[docs]def rmglob(pattern: str) -> None:
"""
Deletes all files whose filename matches the glob ``pattern`` (via
:func:`glob.glob`).
"""
for f in glob.glob(pattern):
os.remove(f)
[docs]def purge(path: str, pattern: str) -> None:
"""
Deletes all files in ``path`` matching ``pattern`` (via
:func:`fnmatch.fnmatch`).
"""
for f in find(pattern, path):
log.info("Deleting {}", f)
os.remove(f)
[docs]def delete_files_within_dir(directory: str, filenames: List[str]) -> None:
"""
Delete files within ``directory`` whose filename *exactly* matches one of
``filenames``.
"""
for dirpath, dirnames, fnames in os.walk(directory):
for f in fnames:
if f in filenames:
fullpath = os.path.join(dirpath, f)
log.debug("Deleting {!r}", fullpath)
os.remove(fullpath)
EXC_INFO_TYPE = Tuple[
Optional[Any], # Type[BaseException]], but that's not in Python 3.5
Optional[BaseException],
Optional[TracebackType], # it's a traceback object
]
# https://docs.python.org/3/library/sys.html#sys.exc_info
[docs]def shutil_rmtree_onerror(
func: Callable[[str], None], path: str, exc_info: EXC_INFO_TYPE
) -> None:
"""
Error handler for ``shutil.rmtree``.
If the error is due to an access error (read only file)
it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage: ``shutil.rmtree(path, onerror=shutil_rmtree_onerror)``
See
https://stackoverflow.com/questions/2656322/shutil-rmtree-fails-on-windows-with-access-is-denied
"""
if not os.access(path, os.W_OK):
# Is the error an access error ?
os.chmod(path, stat.S_IWUSR)
func(path)
else:
exc = exc_info[1]
raise exc
[docs]def rmtree(directory: str) -> None:
"""
Deletes a directory tree.
"""
log.debug("Deleting directory {!r}", directory)
shutil.rmtree(directory, onerror=shutil_rmtree_onerror)
# =============================================================================
# Change ownership or permissions
# =============================================================================
[docs]def chown_r(path: str, user: str, group: str) -> None:
"""
Performs a recursive ``chown``.
Args:
path: path to walk down
user: user name or ID
group: group name or ID
As per https://stackoverflow.com/questions/2853723
"""
for root, dirs, files in os.walk(path):
for x in dirs:
shutil.chown(os.path.join(root, x), user, group)
for x in files:
shutil.chown(os.path.join(root, x), user, group)
[docs]def chmod_r(root: str, permission: int) -> None:
"""
Recursive ``chmod``.
Args:
root: directory to walk down
permission: e.g. ``e.g. stat.S_IWUSR``
"""
os.chmod(root, permission)
for dirpath, dirnames, filenames in os.walk(root):
for d in dirnames:
os.chmod(os.path.join(dirpath, d), permission)
for f in filenames:
os.chmod(os.path.join(dirpath, f), permission)
# =============================================================================
# Find files
# =============================================================================
[docs]def find(pattern: str, path: str) -> List[str]:
"""
Finds files in ``path`` whose filenames match ``pattern`` (via
:func:`fnmatch.fnmatch`).
"""
result = []
for root, dirs, files in os.walk(path):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(os.path.join(root, name))
return result
[docs]def find_first(pattern: str, path: str) -> str:
"""
Finds first file in ``path`` whose filename matches ``pattern`` (via
:func:`fnmatch.fnmatch`), or raises :exc:`IndexError`.
"""
try:
return find(pattern, path)[0]
except IndexError:
log.critical('''Couldn't find "{}" in "{}"''', pattern, path)
raise
[docs]def gen_filenames(
starting_filenames: List[str], recursive: bool
) -> Generator[str, None, None]:
"""
From a starting list of files and/or directories, generates filenames of
all files in the list, and (if ``recursive`` is set) all files within
directories in the list.
Args:
starting_filenames: files and/or directories
recursive: walk down any directories in the starting list, recursively?
Yields:
each filename
"""
for base_filename in starting_filenames:
if os.path.isfile(base_filename):
yield os.path.abspath(base_filename)
elif os.path.isdir(base_filename) and recursive:
for dirpath, dirnames, filenames in os.walk(base_filename):
for fname in filenames:
yield os.path.abspath(os.path.join(dirpath, fname))
# =============================================================================
# Check lock status
# =============================================================================
[docs]def exists_locked(filepath: str) -> Tuple[bool, bool]:
"""
Checks if a file is locked by opening it in append mode.
(If no exception is thrown in that situation, then the file is not locked.)
Args:
filepath: file to check
Returns:
tuple: ``(exists, locked)``
See https://www.calazan.com/how-to-check-if-a-file-is-locked-in-python/.
"""
exists = False
locked = None
file_object = None
if os.path.exists(filepath):
exists = True
locked = True
try:
buffer_size = 8
# Opening file in append mode and read the first 8 characters.
file_object = open(filepath, "a", buffer_size)
if file_object:
locked = False # exists and not locked
except IOError:
pass
finally:
if file_object:
file_object.close()
return exists, locked
# =============================================================================
# Filename/path processing
# =============================================================================
[docs]def relative_filename_within_dir(filename: str, directory: str) -> str:
"""
Starting with a (typically absolute) ``filename``, returns the part of the
filename that is relative to the directory ``directory``.
If the file is *not* within the directory, returns an empty string.
"""
filename = os.path.abspath(filename)
directory = os.path.abspath(directory)
if os.path.commonpath([directory, filename]) != directory:
# Filename is not within directory
return ""
return os.path.relpath(filename, start=directory)
# =============================================================================
# Disk space
# =============================================================================
[docs]def get_directory_contents_size(directory: str = ".") -> int:
"""
Returns the total size of all files within a directory.
See
https://stackoverflow.com/questions/1392413/calculating-a-directorys-size-using-python.
Args:
directory: directory to check
Returns:
int: size in bytes
"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory):
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size
# =============================================================================
# Concatenation
# =============================================================================
[docs]def concatenate(src: List[str], dest: str, filesep: str = os.linesep):
"""
Concatenate multiple text files into one.
"""
with open(dest, "wt") as outfile:
for i, sf in enumerate(src):
if i > 0:
outfile.write(filesep)
with open(sf) as infile:
shutil.copyfileobj(infile, outfile)
# =============================================================================
# Watching for changes
# =============================================================================
[docs]class FileWatcher:
"""
Watch several files for changes.
"""
def __init__(self, filenames: List[str]) -> None:
"""
Initialize with a list of filenames to watch.
"""
self._filenames = filenames
self._when_modified = {f: os.path.getmtime(f) for f in self._filenames}
[docs] def changed(self) -> List[str]:
"""
Returns a list of filenames that have changed (since class instance
creation or last call to this function).
"""
changed_filenames = [] # type: List[str]
for f in self._filenames:
old_time = self._when_modified[f]
new_time = os.path.getmtime(f)
if new_time > old_time:
log.info(f"Change detected in {f}")
changed_filenames.append(f)
self._when_modified[f] = new_time
return changed_filenames