Source code for cardinal_pythonlib.extract_text

#!/usr/bin/env python
# cardinal_pythonlib/extract_text.py

"""
===============================================================================

    Original code copyright (C) 2009-2022 Rudolf Cardinal (rudolf@pobox.com).

    This file is part of cardinal_pythonlib.

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        https://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

===============================================================================

**Converts a bunch of stuff to text, either from external files or from
in-memory binary objects (BLOBs).**

Prerequisites:

.. code-block:: bash

    sudo apt-get install antiword  # required for DOC
    sudo apt-get install pdftotext  # optional, but best way for PDF
    sudo apt-get install strings  # strings/strings2 needed as generic fallback
    sudo apt-get install strings2  # as above
    sudo apt-get install unrtf  # required for RTF

    pip install chardet  # improves character type detection
    pip install pdfminer.six  # optional, backup optional for PDF

- Author: Rudolf Cardinal (rudolf@pobox.com)
- Created: Feb 2015
- Last update: 24 Sep 2015

See also:

- Word

  - https://stackoverflow.com/questions/125222
  - https://stackoverflow.com/questions/424822

- PDF

  - https://stackoverflow.com/questions/25665
  - https://pypi.python.org/pypi/slate
  - https://stackoverflow.com/questions/5725278

- RTF

  - unrtf
  - https://superuser.com/questions/243084/rtf-to-txt-on-unix

- Multi-purpose:

  - https://pypi.python.org/pypi/fulltext/
  - https://media.readthedocs.org/pdf/textract/latest/textract.pdf

- DOCX

  - https://etienned.github.io/posts/extract-text-from-word-docx-simply/

"""


# =============================================================================
# Imports
# =============================================================================

import argparse
import base64
from email import policy
from email.message import EmailMessage
from email.parser import BytesParser
from io import StringIO
import io
import logging
from mimetypes import guess_extension
import os
import re
import shutil
import subprocess
import sys
import textwrap
from typing import (
    Any,
    BinaryIO,
    Dict,
    Generator,
    Iterable,
    Iterator,
    List,
    Optional,
    TYPE_CHECKING,
)
from xml.etree import ElementTree as ElementTree
import zipfile

import bs4
import chardet
from chardet.universaldetector import UniversalDetector
from extract_msg import openMsg
import pdfminer  # pip install pdfminer.six
import pdfminer.pdfinterp
import pdfminer.converter
import pdfminer.layout
import pdfminer.pdfpage
import prettytable
from semantic_version import Version

from cardinal_pythonlib.logs import get_brace_style_log_with_null_handler

if TYPE_CHECKING:
    from extract_msg import MSGFile

log = get_brace_style_log_with_null_handler(__name__)


# =============================================================================
# Constants
# =============================================================================

AVAILABILITY = "availability"
CONVERTER = "converter"
DEFAULT_WIDTH = 120
DEFAULT_MIN_COL_WIDTH = 15
SYS_ENCODING = sys.getdefaultencoding()
ENCODING = "utf-8"


# =============================================================================
# External tool map
# =============================================================================

tools = {
    "antiword": shutil.which("antiword"),  # sudo apt-get install antiword
    "pdftotext": shutil.which("pdftotext"),  # core part of Linux?
    "strings": shutil.which("strings"),  # part of standard Unix
    "strings2": shutil.which("strings2"),
    # ... Windows: https://technet.microsoft.com/en-us/sysinternals/strings.aspx  # noqa: E501
    # ... Windows: http://split-code.com/strings2.html
    "unrtf": shutil.which("unrtf"),  # sudo apt-get install unrtf
}


[docs]def does_unrtf_support_quiet() -> bool: """ The unrtf tool supports the '--quiet' argument from a version that I'm not quite sure of, where ``0.19.3 < version <= 0.21.9``. We check against 0.21.9 here. """ required_unrtf_version = Version("0.21.9") # ... probably: http://hg.savannah.gnu.org/hgweb/unrtf/ # ... 0.21.9 definitely supports --quiet # ... 0.19.3 definitely doesn't support it unrtf_filename = shutil.which("unrtf") if not unrtf_filename: return False p = subprocess.Popen( ["unrtf", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) _, err_bytes = p.communicate() text = err_bytes.decode(sys.getdefaultencoding()) lines = text.split() if len(lines) < 1: return False version_str = lines[0] unrtf_version = Version(version_str) return unrtf_version >= required_unrtf_version
UNRTF_SUPPORTS_QUIET = does_unrtf_support_quiet()
[docs]def update_external_tools(tooldict: Dict[str, str]) -> None: """ Update the global map of tools. Args: tooldict: dictionary whose keys are tools names and whose values are paths to the executables """ global tools tools.update(tooldict)
# ============================================================================= # Text-processing config class # =============================================================================
[docs]class TextProcessingConfig(object): """ Class to manage control parameters for text extraction, without having to pass a lot of mysterious ``**kwargs`` around and lose track of what it means. All converter functions take one of these objects as a parameter. """ def __init__( self, encoding: str = None, width: int = DEFAULT_WIDTH, min_col_width: int = DEFAULT_MIN_COL_WIDTH, plain: bool = False, semiplain: bool = False, docx_in_order: bool = True, horizontal_char: str = "─", vertical_char: str = "│", junction_char: str = "┼", plain_table_start: str = None, plain_table_end: str = None, plain_table_col_boundary: str = None, plain_table_row_boundary: str = None, rstrip: bool = True, ) -> None: """ Args: encoding: optional text file encoding to try in addition to :func:`sys.getdefaultencoding`. width: overall word-wrapping width min_col_width: minimum column width for tables plain: as plain as possible (e.g. for natural language processing); see :func:`docx_process_table`. semiplain: quite plain, but with some ASCII art representation of the table structure. docx_in_order: for DOCX files: if ``True``, process paragraphs and tables in the order they occur; if ``False``, process all paragraphs followed by all tables rstrip: Right-strip whitespace from all lines? horizontal_char: horizontal character to use with PrettyTable, e.g. ``-`` or ``─`` vertical_char: vertical character to use with PrettyTable, e.g. ``|`` or ``│`` junction_char: junction character to use with PrettyTable, e.g. ``+`` or ``┼`` plain_table_start: table start line to use with ``plain=True`` plain_table_end: table end line to use with ``plain=True`` plain_table_col_boundary: boundary between columns to use with ``plain==True`` plain_table_row_boundary: boundary between rows to use with ``plain==True`` Example of a DOCX table processed with: - ``plain=False, semiplain=False`` .. code-block:: none ┼─────────────┼─────────────┼ │ Row 1 col 1 │ Row 1 col 2 │ ┼─────────────┼─────────────┼ │ Row 2 col 1 │ Row 2 col 2 │ ┼─────────────┼─────────────┼ - ``plain=False, semiplain=True`` .. code-block:: none ───────────────────────────── Row 1 col 1 ───────────────────────────── Row 1 col 2 ───────────────────────────── Row 2 col 1 ───────────────────────────── Row 2 col 2 ───────────────────────────── - ``plain=True`` .. code-block:: none ╔═════════════════════════════════════════════════════════════════╗ Row 1 col 1 ─────────────────────────────────────────────────────────────────── Row 1 col 2 ═══════════════════════════════════════════════════════════════════ Row 2 col 1 ─────────────────────────────────────────────────────────────────── Row 2 col 2 ╚═════════════════════════════════════════════════════════════════╝ The plain format is probably better, in general, for NLP, and is definitely clearer with nested tables (for which the word-wrapping algorithm is imperfect). We avoid "heavy" box drawing as it has a higher chance of being mangled under Windows. """ if plain and semiplain: log.warning("You specified both plain and semiplain; using plain") semiplain = False middlewidth = width - 2 if width > 2 else 77 # double if plain_table_start is None: plain_table_start = "╔" + ("═" * middlewidth) + "╗" if plain_table_end is None: plain_table_end = "╚" + ("═" * middlewidth) + "╝" # heavy if plain_table_row_boundary is None: plain_table_row_boundary = "═" * (middlewidth + 2) # light if plain_table_col_boundary is None: plain_table_col_boundary = "─" * (middlewidth + 2) self.encoding = encoding self.width = width self.min_col_width = min_col_width self.plain = plain self.semiplain = semiplain self.docx_in_order = docx_in_order self.horizontal_char = horizontal_char self.vertical_char = vertical_char self.junction_char = junction_char self.plain_table_start = plain_table_start self.plain_table_end = plain_table_end self.plain_table_col_boundary = plain_table_col_boundary self.plain_table_row_boundary = plain_table_row_boundary self.rstrip = rstrip
_DEFAULT_CONFIG = TextProcessingConfig() # ============================================================================= # Support functions # =============================================================================
[docs]def get_filelikeobject(filename: str = None, blob: bytes = None) -> BinaryIO: """ Open a file-like object. Guard the use of this function with ``with``. Args: filename: for specifying via a filename blob: for specifying via an in-memory ``bytes`` object Returns: a :class:`BinaryIO` object """ if not filename and blob is None: raise ValueError("no filename and no blob") if filename and blob: raise ValueError("specify either filename or blob") if filename: return open(filename, "rb") else: return io.BytesIO(blob)
# noinspection PyUnusedLocal
[docs]def get_file_contents(filename: str = None, blob: bytes = None) -> bytes: """ Returns the binary contents of a file, or of a BLOB. """ if filename is None and blob is None: raise ValueError("no filename and no blob") if filename and blob: raise ValueError("specify either filename or blob") if blob is not None: return blob with open(filename, "rb") as f: return f.read()
[docs]def get_chardet_encoding(binary_contents: bytes) -> Optional[str]: """ Guess the character set encoding of the specified ``binary_contents``. """ if not binary_contents: return None if chardet is None or UniversalDetector is None: log.warning("chardet not installed; limits detection of encodings") return None # METHOD 1 # http://chardet.readthedocs.io/en/latest/ # # guess = chardet.detect(binary_contents) # # METHOD 2: faster with large files # http://chardet.readthedocs.io/en/latest/ # https://stackoverflow.com/questions/13857856/split-byte-string-into-lines # noinspection PyCallingNonCallable detector = UniversalDetector() for byte_line in binary_contents.split(b"\n"): detector.feed(byte_line) if detector.done: break guess = detector.result # Handle result if "encoding" not in guess: log.warning("Something went wrong within chardet; no encoding") return None return guess["encoding"]
[docs]def get_file_contents_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Returns the string contents of a file, or of a BLOB. """ binary_contents = get_file_contents(filename=filename, blob=blob) # 1. Try the encoding the user specified if config.encoding: try: return binary_contents.decode(config.encoding) except ValueError: # of which UnicodeDecodeError is more specific # ... https://docs.python.org/3/library/codecs.html pass # 2. Try the system encoding sysdef = sys.getdefaultencoding() if sysdef != config.encoding: try: return binary_contents.decode(sysdef) except ValueError: pass # 3. Try the best guess from chardet # http://chardet.readthedocs.io/en/latest/usage.html if chardet: guess = chardet.detect(binary_contents) if guess["encoding"]: return binary_contents.decode(guess["encoding"]) raise ValueError( "Unknown encoding ({})".format( f"filename={filename!r}" if filename else "blob" ) )
[docs]def get_cmd_output(*args: Any, encoding: str = SYS_ENCODING) -> str: """ Returns text output of a command. """ log.debug("get_cmd_output(): args = {!r}", args) p = subprocess.Popen(args, stdout=subprocess.PIPE) stdout, stderr = p.communicate() return stdout.decode(encoding, errors="ignore")
[docs]def get_cmd_output_from_stdin( stdint_content_binary: bytes, *args: Any, encoding: str = SYS_ENCODING ) -> str: """ Returns text output of a command, passing binary data in via stdin. """ p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdout, stderr = p.communicate(input=stdint_content_binary) return stdout.decode(encoding, errors="ignore")
[docs]def rstrip_all_lines(text: str) -> str: """ Right-strips all lines in a string and returns the result. """ return "\n".join(line.rstrip() for line in text.splitlines())
# ============================================================================= # PDF # ============================================================================= # noinspection PyUnresolvedReferences,PyUnusedLocal
[docs]def convert_pdf_to_txt( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts a PDF file to text. Pass either a filename or a binary object. """ pdftotext = tools["pdftotext"] if pdftotext: # External command method if filename: return get_cmd_output(pdftotext, filename, "-") else: return get_cmd_output_from_stdin(blob, pdftotext, "-", "-") elif pdfminer: # Memory-hogging method with get_filelikeobject(filename, blob) as fp: rsrcmgr = pdfminer.pdfinterp.PDFResourceManager() str_io = StringIO() codec = ENCODING laparams = pdfminer.layout.LAParams() device = pdfminer.converter.TextConverter( rsrcmgr, str_io, codec=codec, laparams=laparams ) interpreter = pdfminer.pdfinterp.PDFPageInterpreter( rsrcmgr, device ) password = "" maxpages = 0 caching = True pagenos = set() for page in pdfminer.pdfpage.PDFPage.get_pages( fp, pagenos, maxpages=maxpages, password=password, caching=caching, check_extractable=True, ): interpreter.process_page(page) text = str_io.getvalue() return text else: raise AssertionError("No PDF-reading tool available")
[docs]def availability_pdf() -> bool: """ Is a PDF-to-text tool available? """ pdftotext = tools["pdftotext"] if pdftotext: return True elif pdfminer: log.warning( "PDF conversion: pdftotext missing; " "using pdfminer (less efficient)" ) return True else: return False
# ============================================================================= # DOCX # ============================================================================= # ----------------------------------------------------------------------------- # In a D.I.Y. fashion # ----------------------------------------------------------------------------- # DOCX specification: https://ecma-international.org/publications-and-standards/standards/ecma-376/ # noqa: E501 DOCX_HEADER_FILE_REGEX = re.compile("word/header[0-9]*.xml") DOCX_DOCUMENT_FILE_REGEX = re.compile("word/document[0-9]*.xml") DOCX_FOOTER_FILE_REGEX = re.compile("word/footer[0-9]*.xml") DOCX_SCHEMA_URL = ( "http://schemas.openxmlformats.org/wordprocessingml/2006/main" ) def docx_qn(tagroot: str) -> str: return f"{{{DOCX_SCHEMA_URL}}}{tagroot}" DOCX_TEXT = docx_qn("t") DOCX_TABLE = docx_qn( "tbl" ) # https://github.com/python-openxml/python-docx/blob/master/docx/table.py DOCX_TAB = docx_qn("tab") DOCX_NEWLINES = [docx_qn("br"), docx_qn("cr")] DOCX_NEWPARA = docx_qn("p") DOCX_TABLE_ROW = docx_qn("tr") DOCX_TABLE_CELL = docx_qn("tc")
[docs]def gen_xml_files_from_docx(fp: BinaryIO) -> Iterator[str]: """ Generate XML files (as strings) from a DOCX file. Args: fp: :class:`BinaryIO` object for reading the ``.DOCX`` file Yields: the string contents of each individual XML file within the ``.DOCX`` file Raises: zipfile.BadZipFile: if the zip is unreadable (encrypted?) """ try: z = zipfile.ZipFile(fp) filelist = z.namelist() for filename in filelist: if DOCX_HEADER_FILE_REGEX.match(filename): yield z.read(filename).decode("utf8") for filename in filelist: if DOCX_DOCUMENT_FILE_REGEX.match(filename): yield z.read(filename).decode("utf8") for filename in filelist: if DOCX_FOOTER_FILE_REGEX.match(filename): yield z.read(filename).decode("utf8") except zipfile.BadZipFile: # Clarify the error: raise zipfile.BadZipFile("File is not a zip file - encrypted DOCX?")
[docs]class DocxFragment(object): """ Representation of a line, or multiple lines, which may or may not need word-wrapping. """ # noinspection PyShadowingNames def __init__(self, text: str, wordwrap: bool = True) -> None: self.text = text self.wordwrap = wordwrap
[docs]def docx_gen_wordwrapped_fragments( fragments: Iterable[DocxFragment], width: int ) -> Generator[str, None, None]: """ Generates word-wrapped fragments. """ to_wrap = [] # type: List[DocxFragment] def yield_wrapped() -> Generator[str, None, None]: """ Yield the word-wrapped stuff to date. """ nonlocal to_wrap if to_wrap: block = "".join(x.text for x in to_wrap) wrapped = "\n".join( wordwrap(line, width) for line in block.splitlines() ) yield wrapped to_wrap.clear() for f in fragments: if f.wordwrap: # Add it to the current wrapping block. to_wrap.append(f) else: # Yield the wrapped stuff to date yield from yield_wrapped() # Yield the new, unwrapped yield f.text yield from yield_wrapped() # any leftovers
[docs]def docx_wordwrap_fragments( fragments: Iterable[DocxFragment], width: int ) -> str: """ Joins multiple fragments and word-wraps them as necessary. """ return "".join(docx_gen_wordwrapped_fragments(fragments, width))
[docs]def docx_gen_fragments_from_xml_node( node: ElementTree.Element, level: int, config: TextProcessingConfig ) -> Generator[DocxFragment, None, None]: """ Returns text from an XML node within a DOCX file. Args: node: an XML node level: current level in XML hierarchy (used for recursion; start level is 0) config: :class:`TextProcessingConfig` control object Returns: contents as a string """ tag = node.tag # for speed log.debug("Level {}, tag {}", level, tag) if tag == DOCX_TEXT: log.debug("Text: {!r}", node.text) yield DocxFragment(node.text or "") elif tag == DOCX_TAB: log.debug("Tab") yield DocxFragment("\t") elif tag in DOCX_NEWLINES: # rarely used? Mostly "new paragraph" log.debug("Newline") yield DocxFragment("\n") elif tag == DOCX_NEWPARA: # Note that e.g. all table cells start with this log.debug("New paragraph") yield DocxFragment("\n\n") # One or two newlines? Clarity better with two -- word-wrapping means # that "single" source lines can take up multiple lines in text format. # So we need a gap between lines to ensure paragraph separation is # visible -- i.e. two newlines. if tag == DOCX_TABLE: log.debug("Table") yield DocxFragment("\n", wordwrap=False) yield DocxFragment( docx_table_from_xml_node(node, level, config), wordwrap=False ) else: for child in node: for fragment in docx_gen_fragments_from_xml_node( child, level + 1, config ): yield fragment
[docs]def docx_text_from_xml_node( node: ElementTree.Element, level: int, config: TextProcessingConfig ) -> str: """ Returns text from an XML node within a DOCX file. Args: node: an XML node level: current level in XML hierarchy (used for recursion; start level is 0) config: :class:`TextProcessingConfig` control object Returns: contents as a string """ return docx_wordwrap_fragments( docx_gen_fragments_from_xml_node(node, level, config), config.width )
[docs]def docx_text_from_xml(xml: str, config: TextProcessingConfig) -> str: """ Converts an XML tree of a DOCX file to string contents. Args: xml: raw XML text config: :class:`TextProcessingConfig` control object Returns: contents as a string """ root = ElementTree.fromstring(xml) return docx_text_from_xml_node(root, 0, config)
[docs]class CustomDocxParagraph(object): """ Represents a paragraph of text in a DOCX file. """ def __init__(self, text: str = "") -> None: self.text = text or "" def __repr__(self) -> str: return f"CustomDocxParagraph(text={self.text!r})"
[docs]class CustomDocxTableCell(object): """ Represents a cell within a table of a DOCX file. May contain several paragraphs. """ def __init__(self, paragraphs: List[CustomDocxParagraph] = None) -> None: self.paragraphs = paragraphs or [] def add_paragraph(self, text: str) -> None: self.paragraphs.append(CustomDocxParagraph(text)) def __repr__(self) -> str: return f"CustomDocxTableCell(paragraphs={self.paragraphs!r})"
[docs]class CustomDocxTableRow(object): """ Represents a row within a table of a DOCX file. May contain several cells (one per column). """ def __init__(self, cells: List[CustomDocxTableCell] = None) -> None: self.cells = cells or [] def add_cell(self, cell: CustomDocxTableCell) -> None: self.cells.append(cell) def new_cell(self) -> None: self.cells.append(CustomDocxTableCell()) def add_paragraph(self, text: str) -> None: self.cells[-1].add_paragraph(text) def __repr__(self) -> str: return f"CustomDocxTableRow(cells={self.cells!r})"
[docs]class CustomDocxTable(object): """ Represents a table of a DOCX file. May contain several rows. """ def __init__(self, rows: List[CustomDocxTableRow] = None) -> None: self.rows = rows or [] def add_row(self, row: CustomDocxTableRow) -> None: self.rows.append(row) def new_row(self) -> None: self.rows.append(CustomDocxTableRow()) def new_cell(self) -> None: self.rows[-1].new_cell() def add_paragraph(self, text: str) -> None: self.rows[-1].add_paragraph(text) def __repr__(self) -> str: return f"CustomDocxTable(rows={self.rows!r})"
[docs]def docx_table_from_xml_node( table_node: ElementTree.Element, level: int, config: TextProcessingConfig ) -> str: """ Converts an XML node representing a DOCX table into a textual representation. Args: table_node: XML node level: current level in XML hierarchy (used for recursion; start level is 0) config: :class:`TextProcessingConfig` control object Returns: string representation """ table = CustomDocxTable() for row_node in table_node: if row_node.tag != DOCX_TABLE_ROW: continue table.new_row() for cell_node in row_node: if cell_node.tag != DOCX_TABLE_CELL: continue table.new_cell() for para_node in cell_node: text = docx_text_from_xml_node(para_node, level, config) if text: table.add_paragraph(text) return docx_process_table(table, config)
# ----------------------------------------------------------------------------- # Generic # -----------------------------------------------------------------------------
[docs]def wordwrap(text: str, width: int) -> str: """ Word-wraps text. Args: text: text to process (will be treated as a single line) width: width to word-wrap to (or 0 to skip word wrapping) Returns: wrapped text .. code-block:: python from cardinal_pythonlib.extract_text import * text = "Here is a very long line that may be word-wrapped. " * 50 print(docx_wordwrap(text, 80)) """ if not text: return "" if width: return "\n".join(textwrap.wrap(text, width=width)) return text
[docs]def docx_process_table( table: CustomDocxTable, config: TextProcessingConfig ) -> str: """ Converts a DOCX table to text. Structure representing a DOCX table: .. code-block:: none table .rows[] .cells[] .paragraphs[] .text That's the structure of a :class:`docx.table.Table` object, but also of our homebrew creation, :class:`CustomDocxTable`. - The ``plain`` and ``semiplain`` options are implemented via the :class:`TextProcessingConfig`. - Note also that the grids in DOCX files can have varying number of cells per row, e.g. .. code-block:: none +---+---+---+ | 1 | 2 | 3 | +---+---+---+ | 1 | 2 | +---+---+ """ def get_cell_text(cell_) -> str: cellparagraphs = [ paragraph.text.strip() for paragraph in cell_.paragraphs ] cellparagraphs = [x for x in cellparagraphs if x] return "\n\n".join(cellparagraphs) if config.plain: # --------------------------------------------------------------------- # Plain -- good for NLP and better for word-wrapping # --------------------------------------------------------------------- lines = [config.plain_table_start] # type: List[str] for r, row in enumerate(table.rows): if r > 0: lines.append(config.plain_table_row_boundary) for c, cell in enumerate(row.cells): if c > 0: lines.append(config.plain_table_col_boundary) lines.append(get_cell_text(cell)) lines.append(config.plain_table_end) return "\n".join(lines) else: # --------------------------------------------------------------------- # Full table visualization, or semiplain # --------------------------------------------------------------------- ncols = 1 # noinspection PyTypeChecker for row in table.rows: ncols = max(ncols, len(row.cells)) pt = prettytable.PrettyTable( field_names=list(range(ncols)), encoding=ENCODING, header=False, border=True, hrules=prettytable.ALL, vrules=prettytable.NONE if config.semiplain else prettytable.ALL, # Can we use UTF-8 special characters? # Even under Windows, sys.getdefaultencoding() returns "utf-8" # (under Python 3.6.8, Windows 6.1.7601 = Windows Server 2008 R2). # The advantage would be that these characters are not likely to # influence any form of NLP. horizontal_char=config.horizontal_char, # default "-" vertical_char=config.vertical_char, # default "|" junction_char=config.junction_char, # default "+" ) pt.align = "l" pt.valign = "t" pt.max_width = max(config.width // ncols, config.min_col_width) if config.semiplain: # noinspection PyTypeChecker for row in table.rows: for i, cell in enumerate(row.cells): n_before = i n_after = ncols - i - 1 # ... use ncols, not len(row.cells), since "cells per row" # is not constant, but prettytable wants a fixed # number. (changed in v0.2.8) ptrow = ( [""] * n_before + [get_cell_text(cell)] + [""] * n_after ) assert len(ptrow) == ncols pt.add_row(ptrow) else: # noinspection PyTypeChecker for row in table.rows: ptrow = [] # type: List[str] # noinspection PyTypeChecker for cell in row.cells: ptrow.append(get_cell_text(cell)) ptrow += [""] * (ncols - len(ptrow)) # added in v0.2.8 assert len(ptrow) == ncols pt.add_row(ptrow) return pt.get_string()
# ----------------------------------------------------------------------------- # DOCX # ----------------------------------------------------------------------------- # noinspection PyUnusedLocal
[docs]def convert_docx_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts a DOCX file to text. Pass either a filename or a binary object. Args: filename: filename to process blob: binary ``bytes`` object to process config: :class:`TextProcessingConfig` control object Returns: text contents Notes: - Old ``docx`` (https://pypi.python.org/pypi/python-docx) has been superseded (see https://github.com/mikemaccana/python-docx). - ``docx.opendocx(file)`` uses :class:`zipfile.ZipFile`, which can take either a filename or a file-like object (https://docs.python.org/2/library/zipfile.html). - Method was: .. code-block:: python with get_filelikeobject(filename, blob) as fp: document = docx.opendocx(fp) paratextlist = docx.getdocumenttext(document) return '\n\n'.join(paratextlist) - Newer ``docx`` is python-docx - https://pypi.python.org/pypi/python-docx - https://python-docx.readthedocs.org/en/latest/ - https://stackoverflow.com/questions/25228106 However, it uses ``lxml``, which has C dependencies, so it doesn't always install properly on e.g. bare Windows machines. PERFORMANCE of my method: - nice table formatting - but tables grouped at end, not in sensible places - can iterate via ``doc.paragraphs`` and ``doc.tables`` but not in true document order, it seems - others have noted this too: - https://github.com/python-openxml/python-docx/issues/40 - https://github.com/deanmalmgren/textract/pull/92 - ``docx2txt`` is at https://pypi.python.org/pypi/docx2txt/0.6; this is pure Python. Its command-line function appears to be for Python 2 only (2016-04-21: crashes under Python 3; is due to an encoding bug). However, it seems fine as a library. It doesn't handle in-memory blobs properly, though, so we need to extend it. PERFORMANCE OF ITS ``process()`` function: - all text comes out - table text is in a sensible place - table formatting is lost. - Other manual methods (not yet implemented): https://etienned.github.io/posts/extract-text-from-word-docx-simply/. Looks like it won't deal with header stuff (etc.) that ``docx2txt`` handles. - Upshot: we need a DIY version. - See also this "compile lots of techniques" libraries, which has C dependencies: https://textract.readthedocs.org/en/latest/ """ text = "" with get_filelikeobject(filename, blob) as fp: for xml in gen_xml_files_from_docx(fp): text += docx_text_from_xml(xml, config) return text
# ============================================================================= # ODT # ============================================================================= # noinspection PyUnusedLocal
[docs]def convert_odt_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts an OpenOffice ODT file to text. Pass either a filename or a binary object. """ # We can't use exactly the same method as for DOCX files, using docx: # sometimes that works, but sometimes it falls over with: # KeyError: "There is no item named 'word/document.xml' in the archive" with get_filelikeobject(filename, blob) as fp: z = zipfile.ZipFile(fp) tree = ElementTree.fromstring(z.read("content.xml")) # ... may raise zipfile.BadZipfile textlist = [] # type: List[str] for element in tree.iter(): if element.text: textlist.append(element.text.strip()) return "\n\n".join(textlist)
# ============================================================================= # HTML # ============================================================================= # noinspection PyUnusedLocal
[docs]def convert_html_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts HTML to text. """ # https://bugs.launchpad.net/beautifulsoup/+bug/2110492 # beautifulsoup4==4.13.4 returns "b''" for an empty bytes array # So we just workaround this here: if blob is not None and len(blob) == 0: return "" with get_filelikeobject(filename, blob) as fp: soup = bs4.BeautifulSoup(fp, "html.parser") # In the real world we can end up with UTF-16 characters embedded as # numbered entities in Windows-1252 encoded HTML such as # &#55357;&#56898; "Slightly smiling face". Replacing these here # avoids "UnicodeEncodeError: 'utf-8' codec can't encode characters in # position ... surrogates not allowed". text = soup.get_text().encode(errors="replace").decode() return text
# ============================================================================= # XML # ============================================================================= # noinspection PyUnusedLocal
[docs]def convert_xml_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts XML to text. """ with get_filelikeobject(filename, blob) as fp: soup = bs4.BeautifulSoup(fp, features="xml") return soup.get_text()
# ============================================================================= # RTF # ============================================================================= # noinspection PyUnresolvedReferences,PyUnusedLocal
[docs]def convert_rtf_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts RTF to text. """ unrtf = tools["unrtf"] if unrtf: # Best args = [unrtf, "--text", "--nopict"] if UNRTF_SUPPORTS_QUIET: args.append("--quiet") if filename: args.append(filename) return get_cmd_output(*args) else: return get_cmd_output_from_stdin(blob, *args) else: raise AssertionError("No RTF-reading tool available")
[docs]def availability_rtf() -> bool: """ Is an RTF processor available? """ unrtf = tools["unrtf"] if unrtf: return True else: return False
# ============================================================================= # DOC # ============================================================================= # noinspection PyUnusedLocal
[docs]def convert_doc_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts Microsoft Word DOC files to text. """ antiword = tools["antiword"] if antiword: if filename: return get_cmd_output(antiword, "-w", str(config.width), filename) else: return get_cmd_output_from_stdin( blob, antiword, "-w", str(config.width), "-" ) else: raise AssertionError("No DOC-reading tool available")
[docs]def availability_doc() -> bool: """ Is a DOC processor available? """ antiword = tools["antiword"] return bool(antiword)
# ============================================================================= # EML # ============================================================================= def convert_eml_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: email_content_list: list[str] = [] with get_filelikeobject(filename, blob) as fp: parser = BytesParser(policy=policy.default) # type: ignore[arg-type] message = parser.parse(fp) for email_content in _gen_email_content(message, config): if email_content is not None: email_content_list.append(email_content) text = "\n".join(email_content_list) return text def _gen_email_content( message: EmailMessage, config: TextProcessingConfig ) -> Generator[Optional[str], None, None]: body = message.get_body( preferencelist=( "html", "plain", ) ) # type: ignore[attr-defined] if body is not None: yield _get_email_content(body, config) for part in message.iter_attachments(): # type: ignore[attr-defined] yield _get_email_content(part, config) def _get_email_content( message: EmailMessage, config: TextProcessingConfig, ) -> Optional[str]: content_type = message.get_content_type() ext = guess_extension(content_type) if ext is not None and ext in ext_map: content = message.get_content() if isinstance(content, str): charset = "utf-8" content_type_header = message.get("Content-Type") if content_type_header: charset = content_type_header.params.get("charset", "utf-8") blob = content.encode(charset, "replace") elif isinstance(content, EmailMessage): blob = content.as_bytes() if message.get("Content-Transfer-Encoding") == "base64": blob = base64.b64decode(blob) else: blob = content return document_to_text(blob=blob, extension=ext, config=config) return None # ============================================================================= # MSG (Outlook binary format) # ============================================================================= def convert_msg_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: message_content_list: list[str] = [] if not filename and blob is None: raise ValueError("convert_msg_to_text: no filename and no blob") if filename and blob: raise ValueError( "convert_msg_to_text: specify either filename or blob" ) if blob is not None: filename_or_blob = blob else: filename_or_blob = filename message = openMsg(filename_or_blob, delayAttachments=False) for message_content in _gen_msg_content(message, config=config): if message_content_list is not None: message_content_list.append(message_content) text = "\n".join(message_content_list) return text def _gen_msg_content( message: "MSGFile", config: TextProcessingConfig ) -> Generator[Optional[str], None, None]: if message.body is not None: yield message.body elif message.htmlBody is not None: yield document_to_text( blob=message.htmlBody, extension=".htm", config=config ) for attachment in message.attachments: if (extension := getattr(attachment, "extension", None)) is not None: # null termination seen in the real world # https://github.com/TeamMsgExtractor/msg-extractor/issues/464 extension = extension.replace("\x00", "") if extension in ext_map: yield document_to_text( blob=attachment.data, extension=extension, config=config ) # ============================================================================= # Anything # ============================================================================= # noinspection PyUnusedLocal
[docs]def convert_anything_to_text( filename: str = None, blob: bytes = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Convert arbitrary files to text, using ``strings`` or ``strings2``. (``strings`` is a standard Unix command to get text from any old rubbish.) """ strings = tools["strings"] or tools["strings2"] if strings: if filename: return get_cmd_output(strings, filename) else: return get_cmd_output_from_stdin(blob, strings) else: raise AssertionError("No fallback string-reading tool available")
[docs]def availability_anything() -> bool: """ Is a generic "something-to-text" processor available? """ strings = tools["strings"] or tools["strings2"] return bool(strings)
# ============================================================================= # Decider # ============================================================================= ext_map: dict[str, dict[str, Any]] = { # Converter functions must be of the form: func(filename, blob, config). # Availability must be either a boolean literal or a function that takes no # params. ".csv": {CONVERTER: get_file_contents_text, AVAILABILITY: True}, ".doc": {CONVERTER: convert_doc_to_text, AVAILABILITY: availability_doc}, ".docm": {CONVERTER: convert_docx_to_text, AVAILABILITY: True}, ".docx": {CONVERTER: convert_docx_to_text, AVAILABILITY: True}, ".dot": {CONVERTER: convert_doc_to_text, AVAILABILITY: availability_doc}, ".eml": {CONVERTER: convert_eml_to_text, AVAILABILITY: True}, ".htm": {CONVERTER: convert_html_to_text, AVAILABILITY: True}, ".html": {CONVERTER: convert_html_to_text, AVAILABILITY: True}, ".log": {CONVERTER: get_file_contents_text, AVAILABILITY: True}, ".msg": {CONVERTER: convert_msg_to_text, AVAILABILITY: True}, ".odt": {CONVERTER: convert_odt_to_text, AVAILABILITY: True}, ".pdf": {CONVERTER: convert_pdf_to_txt, AVAILABILITY: availability_pdf}, ".rtf": {CONVERTER: convert_rtf_to_text, AVAILABILITY: availability_rtf}, ".txt": {CONVERTER: get_file_contents_text, AVAILABILITY: True}, ".xml": {CONVERTER: convert_xml_to_text, AVAILABILITY: True}, None: { # fallback CONVERTER: convert_anything_to_text, AVAILABILITY: availability_anything, }, }
[docs]def document_to_text( filename: str = None, blob: bytes = None, extension: str = None, config: TextProcessingConfig = _DEFAULT_CONFIG, ) -> str: """ Converts a document to text. This function selects a processor based on the file extension (either from the filename, or, in the case of a BLOB, the extension specified manually via the ``extension`` parameter). Pass either a filename or a binary object. Args: filename: the filename to read blob: binary content (alternative to ``filename``) extension: file extension, used as a hint when ``blob`` is used config: an optional :class:`TextProcessingConfig` object Returns: Returns a string if the file was processed (potentially an empty string). Raises: Raises an exception for malformed arguments, missing files, bad filetypes, etc. """ if not filename and blob is None: raise ValueError("document_to_text: no filename and no blob") if filename and blob: raise ValueError("document_to_text: specify either filename or blob") if blob and not extension: raise ValueError("document_to_text: need extension hint for blob") if filename: stub, extension = os.path.splitext(filename) else: if extension[0] != ".": extension = "." + extension extension = extension.lower() # Ensure blob is an appropriate type log.debug( f"filename: {filename}, blob type: {type(blob)}, " f"blob length: {len(blob) if blob is not None else None}, " f"extension: {extension}" ) # If we were given a filename and the file doesn't exist, don't bother. if filename and not os.path.isfile(filename): raise ValueError(f"document_to_text: no such file: {filename!r}") # Choose method info = ext_map.get(extension) if info is None: log.warning("Unknown filetype: {}; using generic tool", extension) info = ext_map[None] func = info[CONVERTER] text = func(filename, blob, config) if config.rstrip: text = rstrip_all_lines(text) return text
[docs]def is_text_extractor_available(extension: str) -> bool: """ Is a text extractor available for the specified extension? """ if extension is not None: extension = extension.lower() info = ext_map.get(extension) if info is None: return False availability = info[AVAILABILITY] if type(availability) is bool: return availability elif callable(availability): return availability() else: raise ValueError(f"Bad information object for extension: {extension}")
[docs]def require_text_extractor(extension: str) -> None: """ Require that a text extractor is available for the specified extension, or raise :exc:`ValueError`. """ if not is_text_extractor_available(extension): raise ValueError( f"No text extractor available for extension: {extension}" )
# ============================================================================= # main, for command-line use # =============================================================================
[docs]def main() -> None: """ Command-line processor. See ``--help`` for details. """ parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument("inputfile", nargs="?", help="Input file name") parser.add_argument( "--availability", nargs="*", help="File extensions to check availability for (use a '.' prefix, " "and use the special extension 'None' to check the fallback " "processor", ) parser.add_argument( "--plain", action="store_true", help="Use plainest format (re e.g. table layouts)", ) parser.add_argument( "--semiplain", action="store_true", help="Use semi-plain format (re e.g. table layouts)", ) parser.add_argument( "--width", type=int, default=DEFAULT_WIDTH, help="Word wrapping width" ) parser.add_argument( "--min-col-width", type=int, default=DEFAULT_MIN_COL_WIDTH, help="Minimum column width for tables", ) parser.add_argument( "--verbose", action="store_true", help="Be verbose", ) args = parser.parse_args() logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) if args.availability: for ext in args.availability: if ext.lower() == "none": ext = None available = is_text_extractor_available(ext) print(f"Extractor for extension {ext} present: {available}") return if not args.inputfile: parser.print_help(sys.stderr) return config = TextProcessingConfig( width=args.width, min_col_width=args.min_col_width, plain=args.plain, semiplain=args.semiplain, ) result = document_to_text(filename=args.inputfile, config=config) if result is None: return else: print(result)
if __name__ == "__main__": main() # *** antiword -w width