Source code for cardinal_pythonlib.openxml.grep_in_openxml

#!/usr/bin/env python3
# cardinal_pythonlib/openxml/grep_in_openxml.py

"""
===============================================================================

    Original code copyright (C) 2009-2022 Rudolf Cardinal (rudolf@pobox.com).

    This file is part of cardinal_pythonlib.

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        https://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

===============================================================================

**Performs a grep (global-regular-expression-print) search of files in OpenXML
format, which is to say inside ZIP files. See the command-line help for
details.**

Version history:

- Written 28 Sep 2017.

Notes:

- use the ``vbindiff`` tool to show *how* two binary files differ.

"""

from argparse import ArgumentParser
from enum import Enum
import logging
import multiprocessing
import os
import re
from sys import argv, getdefaultencoding, stdin
from typing import Optional, Union
from xml.etree import ElementTree
from zipfile import BadZipFile, ZipFile
import zlib

from rich_argparse import RawDescriptionRichHelpFormatter

from cardinal_pythonlib.logs import (
    main_only_quicksetup_rootlogger,
)
from cardinal_pythonlib.fileops import gen_filenames

log = logging.getLogger(__name__)


[docs]class GrepSearchSubstrate(Enum): XML_TEXT = 1 RAW_TEXT = 2 INNER_FILENAME = 3
[docs]class GrepReportContent(Enum): CONTENTS_MATCHING = 1 CONTENTS_NOT_MATCHING = 2 FILENAMES_MATCHING = 3 FILENAMES_NOT_MATCHING = 4
class GrepMode: def __init__( self, pattern: str, ignore_case: bool = False, search_mode: Optional[GrepSearchSubstrate] = None, search_raw_text: bool = False, search_inner_filename: bool = False, report_mode: Optional[GrepReportContent] = None, report_invert_match: bool = False, report_files_with_matches: bool = False, report_files_without_match: bool = False, display_no_filename: bool = False, display_inner_filename: bool = False, ) -> None: """ Args: pattern: What pattern to search for? ignore_case: Use a case-insensitive search. search_mode: Where to search? Specify an enum-based search mode directly. search_raw_text: Boolean flag alternative to search_mode. Search raw text (rather than the default of XML node text)? (Cannot be combined with search_mode, or search_inner_filename.) search_inner_filename: Boolean flag alternative to search_mode. Search inner filename (rather than the default of XML node text)? (Cannot be combined with search_mode, or search_raw_text.) report_mode: How to report? Specify an enum-based report mode directly. report_invert_match: Boolean flag alternative to report_mode. Inverts grep-like behaviour, reporting lines that do not match. (Cannot be combined with report_files_without_match or report_files_with_matches.) report_files_with_matches: Boolean flag alternative to report_mode. Show filenames of files with matches. (Cannot be combined with invert_match or report_files_without_match.) report_files_without_match: Boolean flag alternative to report_mode. Show filenames of files without matches. (Cannot be combined with invert_match or report_files_with_matches.) display_no_filename: For hits, omit the filename of the OpenXML (ZIP) file. display_inner_filename: For hits, show the filenames of inner files, within each OpenXML (ZIP) file. """ # self.search_mode: what to search if search_mode is not None: if search_raw_text or search_inner_filename: raise ValueError( "Can't specify search_raw_text or search_inner_filename " "if you specify search_mode" ) self.search_mode = search_mode else: if search_raw_text and search_inner_filename: raise ValueError( "Can't specify both 'search_raw_text' and " "'search_inner_filename' options" ) if search_raw_text: self.search_mode = GrepSearchSubstrate.RAW_TEXT elif search_inner_filename: self.search_mode = GrepSearchSubstrate.INNER_FILENAME else: # Default is nothing is specified self.search_mode = GrepSearchSubstrate.XML_TEXT self.invert_match = report_invert_match # self.regex: what to search for self.pattern = pattern self.ignore_case = ignore_case if self.use_byte_regex: # Create a regex for type: bytes encoding = getdefaultencoding() final_pattern = pattern.encode(encoding) else: # Create a regex for type: str final_pattern = pattern flags = re.IGNORECASE if ignore_case else 0 self.regex = re.compile(final_pattern, flags) # self.report_mode: what to report n_report_booleans = sum( [ report_invert_match, report_files_with_matches, report_files_without_match, ] ) if report_mode is not None: if n_report_booleans > 0: raise ValueError( "Can't specify report_invert_match, " "report_files_with_matches, or report_files_without_match " "if you specify report_mode" ) self.report_mode = report_mode else: if n_report_booleans > 1: raise ValueError( "Specify at most one of: report_invert_match, " "report_files_with_matches, report_files_without_match" ) if report_invert_match: self.report_mode = GrepReportContent.CONTENTS_NOT_MATCHING elif report_files_with_matches: self.report_mode = GrepReportContent.FILENAMES_MATCHING elif report_files_without_match: self.report_mode = GrepReportContent.FILENAMES_NOT_MATCHING else: # default self.report_mode = GrepReportContent.CONTENTS_MATCHING self.display_no_filename = display_no_filename self.display_inner_filename = display_inner_filename def __repr__(self) -> str: return ( f"GrepMode(pattern={self.pattern!r}, " f"ignore_case={self.ignore_case}, " f"search_mode={self.search_mode}, " f"report_mode={self.report_mode}, " f"display_no_filename={self.display_no_filename}, " f"display_inner_filename={self.display_inner_filename})" ) def __str__(self) -> str: return repr(self) @property def use_byte_regex(self) -> bool: return self.search_mode == GrepSearchSubstrate.RAW_TEXT @property def report_hit_lines(self) -> bool: return self.report_mode == GrepReportContent.CONTENTS_MATCHING @property def report_miss_lines(self) -> bool: return self.report_mode == GrepReportContent.CONTENTS_NOT_MATCHING @property def report_files_with_matches(self) -> bool: return self.report_mode == GrepReportContent.FILENAMES_MATCHING @property def report_files_without_match(self) -> bool: return self.report_mode == GrepReportContent.FILENAMES_NOT_MATCHING
[docs]def report_hit_filename( zipfilename: str, inner_filename: str, display_inner_filename: bool ) -> None: """ For "hits": prints either the ``.zip`` filename, or the ``.zip`` filename and the inner filename. Args: zipfilename: Filename of the outer OpenXML/zip file. inner_filename: Filename of the inner file. display_inner_filename: If True, show both outer and inner filename; if False, show just the outer (OpenXML/zip) filename. """ if display_inner_filename: print(f"{zipfilename} [{inner_filename}]") else: print(zipfilename)
[docs]def report_miss_filename(zipfilename: str) -> None: """ For "misses": prints the zip filename. """ print(zipfilename)
[docs]def report_line( zipfilename: str, inner_filename: str, line: Union[bytes, str], display_no_filename: bool, display_inner_filename: bool, ) -> None: """ Prints a line from a file, with the ``.zip`` filename and optionally also the inner filename. Args: zipfilename: Filename of the ``.zip`` file. inner_filename: Filename of the inner file. line: The line from the inner file. display_no_filename: Skip display of the outer filename. display_inner_filename: (Only applicable if no_filename is False.) If True, show both outer and inner filename; if False, show just the outer (OpenXML/zip) filename. """ if display_no_filename: print(line) elif display_inner_filename: print(f"{zipfilename} [{inner_filename}]: {line}") else: print(f"{zipfilename}: {line}")
[docs]def parse_zip(zipfilename: str, mode: GrepMode) -> None: """ Implement a "grep within an OpenXML file" for a single OpenXML file, which is by definition a ``.zip`` file. Args: zipfilename: Name of the OpenXML (zip) file. mode: Object configuring grep-type mode. """ log.debug(f"Checking OpenXML ZIP: {zipfilename}") # Cache for speed: search_mode = mode.search_mode regex_search = mode.regex.search report_files_with_matches = mode.report_files_with_matches report_hit_lines = mode.report_hit_lines report_miss_lines = mode.report_miss_lines display_no_filename = mode.display_no_filename display_inner_filename = mode.display_inner_filename # Local data: found_in_zip = False # Have we found something in this zip file? May be used for early abort. def _report( _found_locally: bool, _innerfilename: str, _to_report: Union[bytes, str], ) -> bool: """ Reporting function. This gets called more often than you might think, including for lines that do not need reporting, but this is to simplify the handling of "invert_match" (which may require all non-match lines to be reported). Arguments: _found_locally: Have we found a match in a current line? _innerfilename: The name of the inner file we are currently searching. _to_report: The text (usually a line, possibly the inner filename) that should be reported, if we report something. It might be matching text, or non-matching text. Returns: Are we done for this ZIP file (should the outer function return)? """ if report_files_with_matches and found_in_zip: report_hit_filename( zipfilename=zipfilename, inner_filename=_innerfilename, display_inner_filename=display_inner_filename, ) return True if (report_hit_lines and _found_locally) or ( report_miss_lines and not _found_locally ): report_line( zipfilename=zipfilename, inner_filename=_innerfilename, line=_to_report, display_no_filename=display_no_filename, display_inner_filename=display_inner_filename, ) return False def _search_inner_file(zf: ZipFile, innerfilename: str) -> bool: """ Deal with a single inner file. Arguments: zf: zip file innerfilename: inner filename Returns: Are we done for this ZIP file (should the outer function return)? """ nonlocal found_in_zip if search_mode == GrepSearchSubstrate.INNER_FILENAME: # ----------------------------------------------------------------- # Search the (inner) filename # ----------------------------------------------------------------- # log.debug("... ... searching filename") found_in_filename = bool(regex_search(innerfilename)) found_in_zip |= found_in_filename done = _report( _found_locally=found_in_filename, _innerfilename=innerfilename, _to_report=innerfilename, ) return done if search_mode == GrepSearchSubstrate.RAW_TEXT: # ----------------------------------------------------------------- # Search textually, line by line # --------------------------------------------------------- # log.debug("... ... searching plain text") try: with zf.open(innerfilename, "r") as file: try: for line in file.readlines(): # "line" is of type "bytes" found_in_line = bool(regex_search(line)) found_in_zip |= found_in_line done = _report( _found_locally=found_in_line, _innerfilename=innerfilename, _to_report=line, ) if done: return True except EOFError: pass except RuntimeError as e: log.warning( f"RuntimeError whilst processing {zipfilename} " f"[{innerfilename}]: probably encrypted contents; " f"error was {e!r}" ) else: # ----------------------------------------------------------------- # Search the text contents of XML # ----------------------------------------------------------------- # log.debug("... ... searching XML contents") try: with zf.open(innerfilename, "r") as file: data_str = file.read() try: tree = ElementTree.fromstring(data_str) except ElementTree.ParseError: log.debug( f"... ... skipping (not XML): " f"{innerfilename}" ) return False for elem in tree.iter(): line = elem.text if not line: continue found_in_line = bool(regex_search(line)) found_in_zip |= found_in_line done = _report( _found_locally=found_in_line, _innerfilename=innerfilename, _to_report=line, ) if done: return True except RuntimeError as e: log.warning( f"RuntimeError whilst processing {zipfilename} " f"[{innerfilename}]: probably encrypted contents; " f"error was {e!r}" ) return False # Process the zip file try: with ZipFile(zipfilename, "r") as _zf: # Iterate through inner files for _innerfilename in _zf.namelist(): log.debug(f"... checking inner file: {_innerfilename}") zip_done = _search_inner_file(_zf, _innerfilename) if zip_done: return except (zlib.error, BadZipFile) as exc: log.warning(f"Invalid zip: {zipfilename}; error was {exc!r}") except IsADirectoryError: log.warning(f"Skipping directory: {zipfilename}") if mode.report_files_without_match and not found_in_zip: report_miss_filename(zipfilename)
[docs]def main() -> None: """ Command-line handler for the ``grep_in_openxml`` tool. Use the ``--help`` option for help. """ exe_name = os.path.basename(argv[0]) or "grep_in_openxml" parser = ArgumentParser( formatter_class=RawDescriptionRichHelpFormatter, description=rf""" Performs a grep (global-regular-expression-print) search of files in OpenXML format, which is to say inside ZIP files. TYPICAL USAGE. To find files in a tree, you can use the "find" tool. For example, to find all ".docx" files in a directory (or its subdirectories) that contain the phrase "armadillo country", you could use: find <STARTDIR> -type f -name "*.docx" -exec {exe_name} -l "armadillo country" {{}} \; Or, if you don't need the restriction to ".docx" files, you could use this tool directly, specifying a directory and "--recursive", as in {exe_name} -l --recursive "armadillo country" <STARTDIR> CHAINING. Note that you can chain. For example, to find both "Laurel" and "Hardy" in DOC/DOCX documents, in case-insensitive fashion: find . -type f -iname "*.doc*" -exec {exe_name} -l -i "laurel" {{}} \; | {exe_name} -x -l -i "hardy" """, # noqa: E501 ) parser.add_argument("pattern", help="Regular expression pattern to apply.") parser.add_argument( "filename", nargs="*", help="File(s) to check. You can also specify directores if you use " "--recursive", ) parser.add_argument( "--filenames_from_stdin", "-x", action="store_true", help="Take filenames from stdin instead, one line per filename " "(useful for chained grep).", ) parser.add_argument( "--recursive", action="store_true", help="Allow search to descend recursively into any directories " "encountered.", ) # Flag abbreviations to match grep: parser.add_argument( "--ignore_case", "-i", action="store_true", help="Ignore case" ) parser.add_argument( "--invert_match", "-v", action="store_true", help="Invert match (show content lines not matching the search " "pattern)", ) parser.add_argument( "--files_with_matches", "-l", action="store_true", help="Show filenames of files with matches", ) parser.add_argument( "--files_without_match", "-L", action="store_true", help="Show filenames of files with no match", ) parser.add_argument( "--grep_inner_file_name", action="store_true", help="Search the NAMES of the inner files, not their contents.", ) parser.add_argument( "--grep_raw_text", action="store_true", help="Search the raw text, not the XML node text contents.", ) parser.add_argument( "--no_filename", action="store_true", help="For hits, omit the filename of the OpenXML file.", ) parser.add_argument( "--show_inner_filename", action="store_true", help="For hits, show the filenames of inner files, within each " "OpenXML (ZIP) file. Ignored if --no_filename is true.", ) parser.add_argument( "--nprocesses", type=int, default=multiprocessing.cpu_count(), help="Specify the number of processes to run in parallel.", ) parser.add_argument( "--verbose", action="store_true", help="Verbose output" ) args = parser.parse_args() if args.grep_raw_text and args.grep_inner_file_name: raise ValueError( "Can't specify both --grep_raw_text and --grep_inner_file_name" ) n_report_booleans = sum( [ args.invert_match, args.files_with_matches, args.files_without_match, ] ) if n_report_booleans > 1: raise ValueError( "Specify at most one of --invert_match (-v), " "--files_with_matches (-l), " "--files_without_match (-L)" ) if bool(args.filenames_from_stdin) == bool(args.filename): raise ValueError( "Specify --filenames_from_stdin or filenames on the " "command line, but not both" ) main_only_quicksetup_rootlogger( level=logging.DEBUG if args.verbose else logging.INFO ) mode = GrepMode( pattern=args.pattern, ignore_case=args.ignore_case, search_raw_text=args.grep_raw_text, search_inner_filename=args.grep_inner_file_name, report_invert_match=args.invert_match, report_files_with_matches=args.files_with_matches, report_files_without_match=args.files_without_match, display_no_filename=args.no_filename, display_inner_filename=args.show_inner_filename, ) log.debug(f"Mode: {mode}") # Iterate through files # - Common arguments common_kwargs = dict(mode=mode) # - Filenames, as iterator if args.filenames_from_stdin: line_it = (line.strip() for line in stdin.readlines()) zipfilename_it = filter(None, line_it) # remove any blanks else: zipfilename_it = gen_filenames( starting_filenames=args.filename, recursive=args.recursive ) # - Combined arguments, as iterator arg_it = ( dict(zipfilename=zipfilename, **common_kwargs) for zipfilename in zipfilename_it ) if args.nprocesses == 1: # Force serial processing (useful for debugging). for kwargs in arg_it: parse_zip(**kwargs) else: # Set up pool for parallel processing pool = multiprocessing.Pool(processes=args.nprocesses) # Launch in parallel jobs = [pool.apply_async(parse_zip, [], kwargs) for kwargs in arg_it] # Stop entry to the pool (close) and wait for children (join). # See https://stackoverflow.com/questions/38271547/. pool.close() pool.join() # Collect results, re-raising any exceptions. (Otherwise they will be # invisible.) See https://stackoverflow.com/questions/6728236/. for j in jobs: j.get()
if __name__ == "__main__": main()