Source code for plugin_help.editor

#!/usr/bin/env python
# coding: utf-8

from __future__ import annotations

"""
This module provides some functions for modifying files in the 
`Sigil Ebook Editor <https://sigil-ebook.com/>` plug-ins.
"""

__author__  = "ChenyangGao <https://chenyanggao.github.io/>"
__version__ = (0, 1, 4)
__all__ = [
    "html_fromstring", "html_tostring", "xml_fromstring", "xml_tostring", 
    "IterMatchInfo", "re_iter", "re_sub", "WriteBack", "DoNotWriteBack", "edit", 
    "ctx_edit", "ctx_edit_sgml", "ctx_edit_html", "read_iter", "read_html_iter", 
    "edit_iter", "edit_batch", "edit_html_iter", "edit_html_batch", 
    "EditCache", "TextEditCache", 
]

import sys

from contextlib import contextmanager
from enum import Enum
from functools import partial
from re import compile as re_compile, Match, Pattern
from typing import (
    cast, Any, AnyStr, Callable, ContextManager, Dict, Final, 
    Generator, Iterable, Iterator, List, Mapping, MutableMapping, 
    NamedTuple, Optional, Tuple, TypeVar, Union, 
)
from types import MappingProxyType

try:
    from bookcontainer import BookContainer # type: ignore
except ImportError:
    pass

try:
    from plugin_util.pip_tool import ensure_install

    ensure_install("lxml")
    ensure_install("cssselect")

    from cssselect.xpath import GenericTranslator # type: ignore
    from lxml.cssselect import CSSSelector # type: ignore
    from lxml.etree import _Element as Element, XPath # type: ignore
except ImportError:
    from xml.etree.ElementTree import Element
    from plugin_util.htmlparser import ( # type: ignore
        html_fromstring, html_tostring, xml_fromstring, xml_tostring
    )
    _LXML_IMPORTED = False
else:
    from plugin_util.lxmlparser import ( # type: ignore
        html_fromstring, html_tostring, xml_fromstring, xml_tostring
    )
    _LXML_IMPORTED = True


T = TypeVar("T")
PatternType = Union[AnyStr, Pattern]


def _ensure_bc(
    bc: Optional[BookContainer] = None, 
    frame_back: int = 2, # positive integer
) -> BookContainer:
    """Helper function to guarantee that the return value is 
    `bookcontainer.BookContainer` type"""
    if isinstance(bc, BookContainer):
        return bc
    elif bc is None:
        try:
            bc = sys._getframe(frame_back).f_globals["bc"]
            if not isinstance(bc, BookContainer):
                raise TypeError
        except (KeyError, TypeError):
            import plugin_help as plugin
            bc = BookContainer(plugin.WRAPPER)
        return bc
    raise TypeError("Expected type %r, got %r" % (BookContainer, type(bc)))


[docs] class IterMatchInfo(NamedTuple): """Context information wrapper for regular expression matches. - **bc**: The ePub editor object `BookContainer`. `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. - **manifest_id**: The file's manifest id (listed in the OPF file). - **local_no**: Number in the current file (from 1). - **global_no**: Number in all files (from 1). - **file_no**: Number of processed files (from 1). - **href**: The file's OPF href. - **mimetype**: The file's media type. - **match**: The regular expression match object. - **string**: The content of the current file. """ bc: BookContainer manifest_id: str local_no: int # unsigned integer global_no: int # unsigned integer file_no: int # unsigned integer href: str mimetype: str match: Match string: Union[bytes, str]
[docs] def re_iter( pattern: PatternType, manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, errors: str = "ignore", more_info: bool = False, ) -> Union[Generator[Match, None, None], Generator[IterMatchInfo, None, None]]: """Iterate over each of the files corresponding to the given manifest_id_s with regular expressions, and yield matches one by one. :param pattern: A regular expression pattern string or compiled object. :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.text_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param errors: Strategies for errors, it can take a value in ("ignore", "raise", "skip"). - **ignore**: Ignore the error and continue processing, but the number will increase. - **skip**: Ignore the error and continue processing, the number will not increase. - **raise**: Raise the error and stop processing. :param more_info: If false, the yielding results are the match object of the regular expression, else the yielding results are the namedtuple `IterMatchInfo` objects, including the following fields: - **bc**: The ePub editor object. - **manifest_id**: The file's manifest id (listed in the OPF file) - **local_no**: Number in the current file (from 1) - **global_no**: Number in all files (from 1) - **file_no**: Number of processed files (from 1) - **href**: The file's OPF href - **mimetype**: The file's media type - **match**: The regular expression match object - **string**: The content of the current file :return: Generator, if `more_info` is True, then yield `IterMatchInfo` object, else yield `Element` object. :Examples: .. code-block:: python # Print all text node match objects one by one for text in re_iter(r"(?<=>)[^<]+"): print(text) """ bc = cast(BookContainer, _ensure_bc(bc)) fn: Callable = re_compile(pattern).finditer if manifest_id_s is None: manifest_id_s = (info[0] for info in bc.text_iter()) elif isinstance(manifest_id_s, str): manifest_id_s = (manifest_id_s,) if more_info: local_no: int = 1 global_no: int = 1 file_no: int = 1 for fid in manifest_id_s: href = bc.id_to_href(fid) mime = bc.id_to_mime(fid) try: string = bc.readfile(fid) local_no = 1 for match in fn(string): yield IterMatchInfo( bc, fid, local_no, global_no, file_no, href, mime, match, string) local_no += 1 global_no += 1 except: if errors == "skip": continue elif errors == "raise": raise file_no += 1 else: for fid in manifest_id_s: try: string = bc.readfile(fid) yield from fn(string) except: if errors == "raise": raise
[docs] def re_sub( pattern: PatternType, repl: Union[AnyStr, Callable[[Match], AnyStr], Callable[[IterMatchInfo], AnyStr]], manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, errors: str = "ignore", more_info: bool = False, ) -> None: """Iterate over each of the files corresponding to the given manifest_id_s with regular expressions, and replace all matches. :param pattern: A regular expression pattern string or compiled object. :param repl: `repl` can be either a string or a callable. If it is a string, backslash escapes in it are processed. If it is a callable, it's passed the specified object (see param `more_info`) and must return a replacement string to be used. :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.text_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param errors: Strategies for errors, it can take a value in ("ignore", "raise", "skip"). - **ignore**: Ignore the error and continue processing, but the number will increase. - **skip**: Ignore the error and continue processing, the number will not increase. - **raise**: Raise the error and stop processing. :param more_info: This parameter only takes effect when `repl` is a callable. If false, the argument was passed to the `repl` function is the match object of the regular expression, else the argument was passed to the `repl` function is the namedtuple `IterMatchInfo` object, including the following fields: - **bc**: The ePub editor object. - **manifest_id**: The file's manifest id (listed in the OPF file) - **local_no**: Number in the current file (from 1) - **global_no**: Number in all files (from 1) - **file_no**: Number of processed files (from 1) - **href**: The file's OPF href - **mimetype**: The file's media type - **match**: The regular expression match object - **string**: The content of the current file :Examples: .. code-block:: python # clear all text nodes" text re_sub(r"(?<=>)[^<]+", "") """ bc = cast(BookContainer, _ensure_bc(bc)) fn: Callable = re_compile(pattern).sub if manifest_id_s is None: manifest_id_s = (info[0] for info in bc.text_iter()) elif isinstance(manifest_id_s, str): manifest_id_s = (manifest_id_s,) if callable(repl): repl = cast(Callable[..., AnyStr], repl) local_no: int = 1 global_no: int = 1 file_no: int = 1 if more_info: def _repl(match): nonlocal local_no, global_no try: ret = repl(IterMatchInfo( bc, fid, local_no, global_no, file_no, href, mime, match, string)) except: if errors == "skip": global_no = old_global_no raise elif errors == "raise": raise else: local_no += 1 global_no += 1 return ret else: _repl = repl for fid in manifest_id_s: old_global_no = global_no local_no = 1 href = bc.id_to_href(fid) mime = bc.id_to_mime(fid) try: string = bc.readfile(fid) string_new = fn(_repl, string) if string != string_new: bc.writefile(fid, string_new) except: if errors == "skip": continue elif errors == "raise": raise file_no += 1 else: for fid in manifest_id_s: try: string = bc.readfile(fid) string_new = fn(repl, string) if string != string_new: bc.writefile(fid, string_new) except: if errors == "raise": raise
[docs] class WriteBack(Exception): """If changes require writing back to the file, you can raise this exception""" def __init__(self, data): self.data = data
[docs] class DoNotWriteBack(Exception): """If changes do not require writing back to the file, you can raise this exception"""
[docs] def edit( manifest_id: str, operate: Callable[..., Union[bytes, str]], bc: Optional[BookContainer] = None, ) -> bool: """Read the file data, operate on, and then write the changed data back :param manifest_id: Manifest id, is listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id :param operate: Take data in, operate on, and then return the changed data. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :return: Is it successful? """ bc = cast(BookContainer, _ensure_bc(bc)) content = bc.readfile(manifest_id) try: content_new = operate(content) except DoNotWriteBack: return False except WriteBack as exc: content_new = exc.data if content_new is None: return False if content != content_new: bc.writefile(manifest_id, content_new) return True return False
[docs] @contextmanager def ctx_edit( manifest_id: str, bc: Optional[BookContainer] = None, wrap_me: bool = False, ) -> Generator[Union[dict, bytes, str], None, bool]: """Read and yield the file data, and then take in and write back the changed data. :param manifest_id: Manifest id, is listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param wrap_me: Whether to wrap up object, if True, return a dict containing keys ("manifest_id", "data", "write_back"). :return: A context manager that returns the `data` .. code-block:: python if wrap_me: data: dict = { "manifest_id": manifest_id, "data": bc.readfile(manifest_id), "write_back": True, } else: data: Union[bytes, str] = bc.readfile(manifest_id) :Examples: .. code-block:: python def operations_on_content(data_old): ... return data_new with ctx_edit(manifest_id, bc) as content: content_new = operations_on_content(content) # If you need writing back if you_need_writing_back: raise WriteBack(content_new) else: # If you don"t need writing back, just pass pass # OR equivalent to with ctx_edit(manifest_id, bc, wrap_me=True) as data: content = data["data"] content_new = operations_on_content(content) # If you need writing back if you_need_writing_back: data["data"] = content_new else: # If you don"t need writing back raise DoNotWriteBack # OR equivalent to: # data["write_back"] = False # OR equivalent to: ## del data["write_back"] # OR equivalent to: ## data["data"] = None # OR equivalent to: ## del data["data"] """ bc = cast(BookContainer, _ensure_bc(bc, 3)) content = bc.readfile(manifest_id) try: if wrap_me: data = { "manifest_id": manifest_id, "data": content, "write_back": True, } yield data if data.get("data") is None or not data.get("write_back"): raise DoNotWriteBack content_new = data["data"] else: yield content raise DoNotWriteBack except DoNotWriteBack: return False except WriteBack as exc: content_new = exc.data if content_new is None: return False if content != content_new: bc.writefile(manifest_id, content_new) return True return False
[docs] @contextmanager def ctx_edit_sgml( manifest_id: str, bc: Optional[BookContainer] = None, fromstring: Callable = xml_fromstring, tostring: Callable[..., Union[bytes, bytearray, str]] = xml_tostring, ) -> Generator[Any, Any, bool]: """Read and yield the etree object (parsed from a xml file), and then write back the above etree object. :param manifest_id: Manifest id, is listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param fromstring: Parses an XML or SGML document or fragment from a string. Returns the root node (or the result returned by a parser target). :param tostring: Serialize an element to an encoded string representation of its XML or SGML tree. :Examples: .. code-block:: python def operations_on_etree(etree): ... with ctx_edit_sgml(manifest_id, bc) as etree: operations_on_etree(etree) # If you don"t need writing back ## raise DoNotWriteBack """ bc = cast(BookContainer, _ensure_bc(bc, 3)) content = bc.readfile(manifest_id) tree = fromstring(content.encode("utf-8")) try: yield tree except DoNotWriteBack: return False except WriteBack as exc: content_new = exc.data if content_new is None: return False elif not isinstance(content_new, (bytes, bytearray, str)): content_new = tostring(content_new) else: content_new = tostring(tree) if isinstance(content_new, (bytes, bytearray)): content_new = content_new.decode("utf-8") if content != content_new: bc.writefile(manifest_id, content_new) return True return False
[docs] @contextmanager def ctx_edit_html( manifest_id: str, bc: Optional[BookContainer] = None, ) -> Generator[Any, Any, bool]: """Read and yield the etree object (parsed from a (X)HTML file), and then write back the above etree object. :param manifest_id: Manifest id, is listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :Examples: .. code-block:: python def operations_on_etree(etree): ... with ctx_edit_html(manifest_id, bc) as etree: operations_on_etree(etree) # If you don"t need writing back ## raise DoNotWriteBack """ bc = cast(BookContainer, _ensure_bc(bc, 3)) return (yield from ctx_edit_sgml.__wrapped__( # type: ignore manifest_id, bc, html_fromstring, partial( html_tostring, method="xhtml" if "xhtml" in bc.id_to_mime(manifest_id) else "html", ), ))
[docs] def read_iter( manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, ) -> Generator[Tuple[str, str, Union[bytes, str]], None, None]: """Iterate over the data of each manifest_id_s. :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.manifest_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. """ bc = cast(BookContainer, _ensure_bc(bc)) it: Iterable[Tuple[str, str]] if manifest_id_s is None: it = (info[:2] for info in bc.manifest_iter()) elif isinstance(manifest_id_s, str): it = (manifest_id_s, bc.id_to_href(manifest_id_s)), else: it = ((id, bc.id_to_href(id)) for id in manifest_id_s) for fid, href in it: yield fid, href, bc.readfile(fid)
[docs] def read_html_iter( manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, ) -> Generator[Tuple[str, str, Element], None, None]: """Iterate over the data as (X)HTML etree object of each manifest_id_s. :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.manifest_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. """ bc = cast(BookContainer, _ensure_bc(bc)) it: Iterable[Tuple[str, str]] if manifest_id_s is None: it = (info[:2] for info in bc.text_iter()) elif isinstance(manifest_id_s, str): it = (manifest_id_s, bc.id_to_href(manifest_id_s)), else: it = ((id, bc.id_to_href(id)) for id in manifest_id_s) for fid, href in it: yield fid, href, html_fromstring(bc.readfile(fid).encode("utf-8"))
[docs] def edit_iter( manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, wrap_me: bool = False, yield_cm: bool = False, ): """Used to process a collection of specified files in ePub file one by one :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.manifest_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param wrap_me: Will pass to function ctx_edit as keyword argument. :param yield_cm: Determines whether each iteration returns the context manager. :Examples: .. code-block:: python def operations_on_content(data_old): ... return data_new edit_worker = edit_iter(manifest_id_s, bc) for fid, content in edit_worker: content_new = operations_on_content(content) # **NOTE**: `content_new` can"t be None if you_need_writing_back: edit_worker.send(content_new) else: # If you don"t need writing back, just pass pass # OR equivalent to for fid, data in edit_iter(manifest_id_s, bc, wrap_me=True): content = data["data"] content_new = operations_on_content() if you_need_writing_back: data["data"] = content_new else: # If you don"t need writing back data["write_back"] = False # OR equivalent to: ## del data["write_back"] # OR equivalent to: ## data["data"] = None # OR equivalent to: ## del data["data"] # OR equivalent to for fid, cm in edit_iter(manifest_id_s, bc, yield_cm=True): with cm as content: content_new = operations_on_content() if you_need_writing_back: raise WriteBack(content_new) else: # If you don"t need writing back, just pass pass # OR equivalent to: ## raise DoNotWriteBack """ bc = cast(BookContainer, _ensure_bc(bc)) if manifest_id_s is None: manifest_id_s = (info[0] for info in bc.manifest_iter()) elif isinstance(manifest_id_s, str): manifest_id_s = (manifest_id_s,) for fid in manifest_id_s: if yield_cm: yield fid, ctx_edit(fid, bc, wrap_me=wrap_me) else: with ctx_edit(fid, bc, wrap_me=wrap_me) as data: recv_data = yield fid, data if recv_data is not None: while True: send_data = recv_data recv_data = yield if recv_data is None: raise WriteBack(send_data)
class SuccessStatus(NamedTuple): """This class is used to illustrate the running result """ manifest_id: str is_success: bool = True error: Optional[Exception] = None
[docs] def edit_batch( operate: Callable, manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, ) -> List[SuccessStatus]: """Used to process a collection of specified files in ePub file one by one :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.manifest_iter()`. :param operate: Take data in, operate on, and then return the changed data. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :return: List of tuples of success status. :Examples: .. code-block:: python def operations_on_content(data_old): ... return data_new edit_batch(operations_on_content, manifest_id_s, bc) """ bc = cast(BookContainer, _ensure_bc(bc)) if manifest_id_s is None: manifest_id_s = (info[0] for info in bc.manifest_iter()) elif isinstance(manifest_id_s, str): manifest_id_s = (manifest_id_s,) success_status: List[SuccessStatus] = [] for fid in manifest_id_s: try: with ctx_edit(fid, bc) as content: raise WriteBack(operate(content)) success_status.append(SuccessStatus(fid)) except Exception as exc: success_status.append(SuccessStatus(fid, False, exc)) return success_status
[docs] def edit_html_iter( manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, wrap_me: bool = False, yield_cm: bool = False, ): """Used to process a collection of specified (X)HTML files in ePub file one by one :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.text_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param wrap_me: Whether to wrap up object, if True, return a dict containing keys ("manifest_id", "data", "write_back") :param yield_cm: Determines whether each iteration returns the context manager. :Examples: .. code-block:: python def operations_on_etree(etree): ... edit_worker = edit_html_iter(manifest_id_s, bc) for fid, etree in edit_worker: operations_on_etree(etree) # If you don"t need writing back ## edit_worker.throw(DoNotWriteBack) # OR equivalent to for fid, data in edit_html_iter(manifest_id_s, bc, wrap_me=True): operations_on_etree(data["etree"]) # If you don"t need writing back ## data["write_back"] = False # OR equivalent to: ## del data["write_back"] # OR equivalent to: ## data["data"] = None # OR equivalent to: ## del data["data"] # OR equivalent to for fid, cm in edit_html_iter(manifest_id_s, bc, yield_cm=True): with cm as etree: operations_on_etree(etree) # If you don"t need writing back ## raise DoNotWriteBack """ bc = cast(BookContainer, _ensure_bc(bc)) if manifest_id_s is None: manifest_id_s = (info[0] for info in bc.text_iter()) elif isinstance(manifest_id_s, str): manifest_id_s = (manifest_id_s,) for fid in manifest_id_s: if yield_cm: yield fid, ctx_edit_html(fid, bc) else: with ctx_edit_html(fid, bc) as tree: if wrap_me: data = { "manifest_id": fid, "data": tree, "write_back": True, } recv_data = yield fid, data if recv_data is None: if data.get("data") is None or not data.get("write_back"): raise DoNotWriteBack raise WriteBack(data["data"]) else: recv_data = yield fid, tree if recv_data is not None: while True: send_data = recv_data recv_data = yield if recv_data is None: raise WriteBack(send_data)
[docs] def edit_html_batch( operate: Callable[[Element], Any], manifest_id_s: Union[None, str, Iterable[str]] = None, bc: Optional[BookContainer] = None, ) -> List[SuccessStatus]: """Used to process a collection of specified (X)HTML files in ePub file one by one :param operate: Take etree object in, operate on. :param manifest_id_s: Manifest id collection, are listed in OPF file, The XPath as following (the `namespace` depends on the specific situation): /namespace:package/namespace:manifest/namespace:item/@id If manifest_id_s is None (the default), it will get by `bc.text_iter()`. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :return: List of tuples of success status. :Examples: .. code-block:: python def operations_on_etree(etree): ... edit_html_batch(operations_on_etree, manifest_id_s, bc) """ bc = cast(BookContainer, _ensure_bc(bc)) if manifest_id_s is None: manifest_id_s = (info[0] for info in bc.text_iter()) elif isinstance(manifest_id_s, str): manifest_id_s = (manifest_id_s,) success_status: List[SuccessStatus] = [] for fid in manifest_id_s: try: with ctx_edit_html(fid, bc) as tree: operate(tree) success_status.append(SuccessStatus(fid)) except Exception as exc: success_status.append(SuccessStatus(fid, False, exc)) return success_status
if _LXML_IMPORTED:
[docs] class IterElementInfo(NamedTuple): """The wrapper for the output tuple, contains the following fields - **bc**: The ePub editor object `BookContainer` - **manifest_id**: The file's manifest id (listed in the OPF file) - **local_no**: Number in the current file (from 1) - **global_no**: Number in all files (from 1) - **file_no**: Number of processed files (from 1) - **href**: OPF href - **mimetype**: Media type - **element**: (X)HTML element object - **etree**: (X)HTML tree object """ bc: BookContainer manifest_id: str local_no: int # unsigned integer global_no: int # unsigned integer file_no: int # unsigned integer href: str mimetype: str element: Element etree: Element
[docs] class EnumSelectorType(Enum): """Selector type enumeration. .xpath: Indicates that the selector type is XPath. .cssselect: Indicates that the selector type is CSS Selector. """ xpath = 1 XPath = 1 cssselect = 2 CSS_Selector = 2 @classmethod def of(enum_cls, value): val_cls = type(value) if val_cls is enum_cls: return value elif issubclass(val_cls, int): return enum_cls(value) elif issubclass(val_cls, str): try: return enum_cls[value] except KeyError as exc: raise ValueError(value) from exc raise TypeError(f"expected value's type in ({enum_cls!r}" f", int, str), got {val_cls}")
[docs] def element_iter( path: Union[str, XPath] = "descendant-or-self::*", bc: Optional[BookContainer] = None, seltype: Union[int, str, EnumSelectorType] = EnumSelectorType.cssselect, namespaces: Optional[Mapping] = None, translator: Union[str, GenericTranslator] = "xml", more_info: bool = False, ) -> Union[Generator[Element, None, None], Generator[IterElementInfo, None, None]]: """Traverse all (X)HTML files in epub, search the elements that match the path, and return the relevant information of these elements one by one. :param path: A XPath expression or CSS Selector expression. If its `type` is `str`, then it is a XPath expression or CSS Selector expression determined by `seltype`. If its type is a subclass of "lxml.etree.XPath"`, then parameters `seltype`, `namespaces`, `translator` are ignored. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :param seltype: Selector type. It can be any value that can be accepted by `EnumSelectorType.of`, the return value called final value. If its final value is `EnumSelectorType.xpath`, then parameter `translator` is ignored. :param namespaces: Prefix-namespace mappings used by `path`. To use CSS namespaces, you need to pass a prefix-to-namespace mapping as `namespaces` keyword argument:: >>> from lxml import cssselect, etree >>> rdfns = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" >>> select_ns = cssselect.CSSSelector("root > rdf|Description", ... namespaces={"rdf": rdfns}) >>> rdf = etree.XML(( ... "<root xmlns:rdf="%s">" ... "<rdf:Description>blah</rdf:Description>" ... "</root>") % rdfns) >>> [(el.tag, el.text) for el in select_ns(rdf)] [("{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Description", "blah")] :param translator: A CSS Selector expression to XPath expression translator object. :param more_info: Determine whether to wrap the yielding results. If false, the yielding results are the match objects of the `path` expression, else are the namedtuple `IterElementInfo` objects (with some context information). :return: Generator, if `more_info` is True, then yield `IterElementInfo` object, else yield `Element` object. :Examples: .. code-block:: python def operations_on_element(element): ... for info in element_iter(css_selector, bc): operations_on_element(element) # OR equivalent to for element in element_iter(css_selector, bc, more_info=True): operations_on_element(info.element) """ select: XPath if isinstance(path, str): if EnumSelectorType.of(seltype) is EnumSelectorType.cssselect: select = CSSSelector( path, namespaces=namespaces, translator=translator) else: select = XPath(path, namespaces=namespaces) else: select = path bc = cast(BookContainer, _ensure_bc(bc)) global_no: int = 0 data: dict for file_no, (fid, tree) in enumerate(edit_html_iter(bc=bc), 1): # type: ignore href = bc.id_to_href(fid) mime = bc.id_to_mime(fid) els = select(tree) if not els: del data["write_back"] continue if more_info: for local_no, (global_no, el) in enumerate(enumerate(els, global_no + 1), 1): yield IterElementInfo( bc, fid, local_no, global_no, file_no, href, mime, el, tree) else: yield from els
__all__.extend(("IterElementInfo", "EnumSelectorType", "element_iter"))
[docs] class EditCache(MutableMapping[str, T]): """Initialize an `EditCache` object that can proxy accessing to `bookcontainer.Bookcontainer` object. The edited files" data of this `EditCache` object are cached and not immediately written back to the `bookcontainer.Bookcontainer` object, until the `__exit__` method or the `clear` method are called, and then this `EditCache` object is cleared. **NOTE**: This can operate all the files declared in the OPF file in ePub. **NOTE**: A manifest id is available or not, can be determined by `__contains__` method. **NOTE**: If you need to directly operate on the corresponding `bookcontainer.Bookcontainer` object (e.g., delete a file), please clear this editcache first. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :Examples: .. code-block:: python # Change "utf-8" to "UTF-8" in all (X)HTML texts .. code-block:: python with EditCache(bc) as cache: for fid, *_ in cache.bookcontainer.text_iter(): cache[fid] = cache[fid].replace("utf-8", "UTF-8") """ __context_factory__: Callable[[str, BookContainer], ContextManager] = lambda fid, bc: ctx_edit(fid, bc) def __init__(self, bc: Optional[BookContainer] = None) -> None: bc = cast(BookContainer, _ensure_bc(bc)) self._exit_cbs: Dict[str, Tuple[ContextManager, Callable]]= {} self._data: Dict[str, T] = {} self._bc: BookContainer = bc @contextmanager def _cm(self, fid: str, bc: BookContainer, /) -> Generator[T, None, None]: with type(self).__context_factory__(fid, bc) as data: yield data if fid in self._data: raise WriteBack(self._data[fid]) else: raise DoNotWriteBack @property def data(self) -> MappingProxyType: """A dictionary as a set of [file's manifest id]: [file data object] pairs.""" return MappingProxyType(self._data) @property def bookcontainer(self) -> BookContainer: """Internal `BookContainer` object. `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. """ return self._bc bc = bk = bookcontainer def __contains__(self, fid): "Determine whether `fid` is an available manifest id." return fid in self._bc._w.id_to_mime def __len__(self) -> int: """Count of all available [files" manifest ids].""" return len(self._bc._w.id_to_mime) def __iter__(self) -> Iterator[str]: """Iterate over all available [files" manifest ids] (roughly from `bookcontainer.Bookcontainer.manifest_iter`).""" yield from self._bc._w.id_to_mime
[docs] def iteritems(self) -> Iterator[Tuple[str, T]]: """Iterate over all files (manifest ids are offered by `__iter__` method), and yield a tuple of [file's manifest id] and [file data object] (this will cause the file to be opened) at each time""" for fid in self: yield fid, self[fid]
[docs] def itervalues(self) -> Iterator[T]: """Iterate over all files (manifest ids are offered by `__iter__` method), and yield [file data object] (this will cause the file to be opened) at each time""" for fid in self: yield self[fid]
def __enter__(self): return self def __exit__(self, *exc_info): "Write all opened files back, and clear the `EditCache` object." try: received_exc = exc_info[0] is not None # We manipulate the exception state so it behaves as though # we were actually nesting multiple with statements frame_exc = sys.exc_info()[1] def _fix_exception_context(new_exc, old_exc): # Context may not be correct, so find the end of the chain while True: exc_context = new_exc.__context__ if exc_context is old_exc: # Context is already set correctly (see issue 20317) return if exc_context is None or exc_context is frame_exc: break new_exc = exc_context # Change the end of the chain to point to the exception # we expect it to reference new_exc.__context__ = old_exc # Callbacks are invoked in LIFO order to match the behaviour of # nested context managers suppressed_exc = False pending_raise = False for cm, cm_exit in self._exit_cbs.values(): try: if cm_exit(cm, *exc_info): suppressed_exc = True pending_raise = False exc_info = (None, None, None) except: new_exc_info = sys.exc_info() # simulate the stack of exceptions by setting the context _fix_exception_context(new_exc_info[1], exc_info[1]) pending_raise = True exc_info = new_exc_info if pending_raise: # bare "raise exc_info[1]" replaces our carefully set-up context fixed_ctx = exc_info[1].__context__ try: raise exc_info[1] except BaseException: exc_info[1].__context__ = fixed_ctx raise return received_exc and suppressed_exc finally: self._data.clear() self._exit_cbs.clear()
[docs] def clear(self) -> None: "Write all opened files back, and clear the `EditCache` object." self.__exit__(*sys.exc_info())
__del__ = clear def __getitem__(self, fid) -> T: """Receive a file's manifest id `fid`, return the corresponding of the file data object, otherwise raise `KeyError`.""" data = self._data if fid not in data: try: cm = type(self)._cm(self, fid, self._bc) cm_type = type(cm) data[fid] = cm_type.__enter__(cm) # type: ignore self._exit_cbs[fid] = (cm, cm_type.__exit__) except Exception as exc: raise KeyError(fid) from exc return data[fid] def __setitem__(self, fid, data) -> None: """Update the data of the corresponding manifest id `fid` to `data`. There are 2 restrictions: 1. The manifest id `fid` must be available, otherwise raise `KeyError`. 2. The data type of `data` must the same as original data, otherwise raise `TypeError`. """ original_type = type(self._data[fid]) data_type = type(data) if original_type is not data_type: raise TypeError( "The data type does not match. It must be the same as the data type of " "the original data, expected %r, got %r." % (original_type, data_type)) self._data[fid] = data def __delitem__(self, fid) -> None: """If the manifest id `fid` available and the corresponding data were modified, then clear the modified data.""" if fid in self._data: del self._data[fid] cm, cm_exit = self._exit_cbs.pop(fid) try: raise DoNotWriteBack except: cm_exit(cm, *sys.exc_info())
[docs] def read_id(self, key) -> T: """Receive a file's manifest id, return the content of the file, otherwise raise `KeyError`.""" return self[key]
[docs] def read_href(self, key) -> T: """Receive a file's OPF href, return the content of the file, otherwise raise `KeyError`""" try: return self[self._bc.href_to_id(key)] except Exception as exc: raise KeyError(key) from exc
[docs] def read_basename(self, key) -> T: """Receive a file's basename (with extension), return the content of the file, otherwise raise `KeyError`""" try: return self[self._bc.basename_to_id(key)] except Exception as exc: raise KeyError(key) from exc
[docs] def read_bookpath(self, key) -> T: """Receive a file's bookpath (aka "book_href" aka "bookhref"), return the content of the file, otherwise raise `KeyError`""" try: return self[self._bc.bookpath_to_id(key)] except Exception as exc: raise KeyError(key) from exc
[docs] class TextEditCache(EditCache[T]): """Initialize an `TextEditCache` object that can proxy accessing to `bookcontainer.Bookcontainer` object. The edited files" data of this `TextEditCache` object are cached and not immediately written back to the `bookcontainer.Bookcontainer` object, until the `__exit__` method or the `clear` method are called, and then this `TextEditCache` object is cleared. **NOTE**: This can operate all the text (HTML / XHTML only) files declared in the OPF file in ePub. **NOTE**: A manifest id is available or not, can be determined by `__contains__` method. **NOTE**: If you need to directly operate on the corresponding `bookcontainer.Bookcontainer` object (e.g., delete a file), please clear this editcache first. :param bc: `BookContainer` object. If it is None (the default), will be found in caller's globals(). `BookContainer` object is an object of ePub book content provided by Sigil, which can be used to access and operate the files in ePub. :Examples: .. code-block:: python # Delete the first <title> element that appears in each (X)HTML etree with TextEditCache(bc) as cache: for fid, tree in cache.iteritems(): el_title = tree.find(".//title") if el_title is not None: el_title.getparent().remove(el_title) """ __context_factory__: Callable[[str, BookContainer], ContextManager] = ctx_edit_html def __contains__(self, fid): "Determine whether `fid` is an available manifest id." return fid in iter(self) def __len__(self) -> int: """Count of all available [files" manifest ids] (HTML / XHTML only).""" return sum(1 for _ in self._bc.text_iter()) def __iter__(self) -> Iterator[str]: """Iterate over all available [files" manifest ids] (HTML / XHTML only) (from `bookcontainer.Bookcontainer.text_iter`).""" for fid, *_ in self._bc.text_iter(): yield fid