""" Validator for Word document XML files against XSD schemas. """ import random import re import tempfile import zipfile import defusedxml.minidom import lxml.etree from .base import BaseSchemaValidator class DOCXSchemaValidator(BaseSchemaValidator): WORD_2006_NAMESPACE = "http://schemas.openxmlformats.org/wordprocessingml/2006/main" W14_NAMESPACE = "http://schemas.microsoft.com/office/word/2010/wordml" W16CID_NAMESPACE = "http://schemas.microsoft.com/office/word/2016/wordml/cid" ELEMENT_RELATIONSHIP_TYPES = {} def validate(self): if not self.validate_xml(): return False all_valid = True if not self.validate_namespaces(): all_valid = False if not self.validate_unique_ids(): all_valid = False if not self.validate_file_references(): all_valid = False if not self.validate_content_types(): all_valid = False if not self.validate_against_xsd(): all_valid = False if not self.validate_whitespace_preservation(): all_valid = False if not self.validate_deletions(): all_valid = False if not self.validate_insertions(): all_valid = False if not self.validate_all_relationship_ids(): all_valid = False if not self.validate_id_constraints(): all_valid = False if not self.validate_comment_markers(): all_valid = False self.compare_paragraph_counts() return all_valid def validate_whitespace_preservation(self): errors = [] for xml_file in self.xml_files: if xml_file.name != "document.xml": continue try: root = lxml.etree.parse(str(xml_file)).getroot() for elem in root.iter(f"{{{self.WORD_2006_NAMESPACE}}}t"): if elem.text: text = elem.text if re.search(r"^[ \t\n\r]", text) or re.search( r"[ \t\n\r]$", text ): xml_space_attr = f"{{{self.XML_NAMESPACE}}}space" if ( xml_space_attr not in elem.attrib or elem.attrib[xml_space_attr] != "preserve" ): text_preview = ( repr(text)[:50] + "..." if len(repr(text)) > 50 else repr(text) ) errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: " f"Line {elem.sourceline}: w:t element with whitespace missing xml:space='preserve': {text_preview}" ) except (lxml.etree.XMLSyntaxError, Exception) as e: errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: Error: {e}" ) if errors: print(f"FAILED - Found {len(errors)} whitespace preservation violations:") for error in errors: print(error) return False else: if self.verbose: print("PASSED - All whitespace is properly preserved") return True def validate_deletions(self): errors = [] for xml_file in self.xml_files: if xml_file.name != "document.xml": continue try: root = lxml.etree.parse(str(xml_file)).getroot() namespaces = {"w": self.WORD_2006_NAMESPACE} for t_elem in root.xpath(".//w:del//w:t", namespaces=namespaces): if t_elem.text: text_preview = ( repr(t_elem.text)[:50] + "..." if len(repr(t_elem.text)) > 50 else repr(t_elem.text) ) errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: " f"Line {t_elem.sourceline}: found within : {text_preview}" ) for instr_elem in root.xpath( ".//w:del//w:instrText", namespaces=namespaces ): text_preview = ( repr(instr_elem.text or "")[:50] + "..." if len(repr(instr_elem.text or "")) > 50 else repr(instr_elem.text or "") ) errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: " f"Line {instr_elem.sourceline}: found within (use ): {text_preview}" ) except (lxml.etree.XMLSyntaxError, Exception) as e: errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: Error: {e}" ) if errors: print(f"FAILED - Found {len(errors)} deletion validation violations:") for error in errors: print(error) return False else: if self.verbose: print("PASSED - No w:t elements found within w:del elements") return True def count_paragraphs_in_unpacked(self): count = 0 for xml_file in self.xml_files: if xml_file.name != "document.xml": continue try: root = lxml.etree.parse(str(xml_file)).getroot() paragraphs = root.findall(f".//{{{self.WORD_2006_NAMESPACE}}}p") count = len(paragraphs) except Exception as e: print(f"Error counting paragraphs in unpacked document: {e}") return count def count_paragraphs_in_original(self): original = self.original_file if original is None: return 0 count = 0 try: with tempfile.TemporaryDirectory() as temp_dir: with zipfile.ZipFile(original, "r") as zip_ref: zip_ref.extractall(temp_dir) doc_xml_path = temp_dir + "/word/document.xml" root = lxml.etree.parse(doc_xml_path).getroot() paragraphs = root.findall(f".//{{{self.WORD_2006_NAMESPACE}}}p") count = len(paragraphs) except Exception as e: print(f"Error counting paragraphs in original document: {e}") return count def validate_insertions(self): errors = [] for xml_file in self.xml_files: if xml_file.name != "document.xml": continue try: root = lxml.etree.parse(str(xml_file)).getroot() namespaces = {"w": self.WORD_2006_NAMESPACE} invalid_elements = root.xpath( ".//w:ins//w:delText[not(ancestor::w:del)]", namespaces=namespaces ) for elem in invalid_elements: text_preview = ( repr(elem.text or "")[:50] + "..." if len(repr(elem.text or "")) > 50 else repr(elem.text or "") ) errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: " f"Line {elem.sourceline}: within : {text_preview}" ) except (lxml.etree.XMLSyntaxError, Exception) as e: errors.append( f" {xml_file.relative_to(self.unpacked_dir)}: Error: {e}" ) if errors: print(f"FAILED - Found {len(errors)} insertion validation violations:") for error in errors: print(error) return False else: if self.verbose: print("PASSED - No w:delText elements within w:ins elements") return True def compare_paragraph_counts(self): original_count = self.count_paragraphs_in_original() new_count = self.count_paragraphs_in_unpacked() diff = new_count - original_count diff_str = f"+{diff}" if diff > 0 else str(diff) print(f"\nParagraphs: {original_count} → {new_count} ({diff_str})") def _parse_id_value(self, val: str, base: int = 16) -> int: return int(val, base) def validate_id_constraints(self): errors = [] para_id_attr = f"{{{self.W14_NAMESPACE}}}paraId" durable_id_attr = f"{{{self.W16CID_NAMESPACE}}}durableId" for xml_file in self.xml_files: try: for elem in lxml.etree.parse(str(xml_file)).iter(): if val := elem.get(para_id_attr): if self._parse_id_value(val, base=16) >= 0x80000000: errors.append( f" {xml_file.name}:{elem.sourceline}: paraId={val} >= 0x80000000" ) if val := elem.get(durable_id_attr): if xml_file.name == "numbering.xml": try: if self._parse_id_value(val, base=10) >= 0x7FFFFFFF: errors.append( f" {xml_file.name}:{elem.sourceline}: " f"durableId={val} >= 0x7FFFFFFF" ) except ValueError: errors.append( f" {xml_file.name}:{elem.sourceline}: " f"durableId={val} must be decimal in numbering.xml" ) else: if self._parse_id_value(val, base=16) >= 0x7FFFFFFF: errors.append( f" {xml_file.name}:{elem.sourceline}: " f"durableId={val} >= 0x7FFFFFFF" ) except Exception: pass if errors: print(f"FAILED - {len(errors)} ID constraint violations:") for e in errors: print(e) elif self.verbose: print("PASSED - All paraId/durableId values within constraints") return not errors def validate_comment_markers(self): errors = [] document_xml = None comments_xml = None for xml_file in self.xml_files: if xml_file.name == "document.xml" and "word" in str(xml_file): document_xml = xml_file elif xml_file.name == "comments.xml": comments_xml = xml_file if not document_xml: if self.verbose: print("PASSED - No document.xml found (skipping comment validation)") return True try: doc_root = lxml.etree.parse(str(document_xml)).getroot() namespaces = {"w": self.WORD_2006_NAMESPACE} range_starts = { elem.get(f"{{{self.WORD_2006_NAMESPACE}}}id") for elem in doc_root.xpath( ".//w:commentRangeStart", namespaces=namespaces ) } range_ends = { elem.get(f"{{{self.WORD_2006_NAMESPACE}}}id") for elem in doc_root.xpath( ".//w:commentRangeEnd", namespaces=namespaces ) } references = { elem.get(f"{{{self.WORD_2006_NAMESPACE}}}id") for elem in doc_root.xpath( ".//w:commentReference", namespaces=namespaces ) } orphaned_ends = range_ends - range_starts for comment_id in sorted( orphaned_ends, key=lambda x: int(x) if x and x.isdigit() else 0 ): errors.append( f' document.xml: commentRangeEnd id="{comment_id}" has no matching commentRangeStart' ) orphaned_starts = range_starts - range_ends for comment_id in sorted( orphaned_starts, key=lambda x: int(x) if x and x.isdigit() else 0 ): errors.append( f' document.xml: commentRangeStart id="{comment_id}" has no matching commentRangeEnd' ) comment_ids = set() if comments_xml and comments_xml.exists(): comments_root = lxml.etree.parse(str(comments_xml)).getroot() comment_ids = { elem.get(f"{{{self.WORD_2006_NAMESPACE}}}id") for elem in comments_root.xpath( ".//w:comment", namespaces=namespaces ) } marker_ids = range_starts | range_ends | references invalid_refs = marker_ids - comment_ids for comment_id in sorted( invalid_refs, key=lambda x: int(x) if x and x.isdigit() else 0 ): if comment_id: errors.append( f' document.xml: marker id="{comment_id}" references non-existent comment' ) except (lxml.etree.XMLSyntaxError, Exception) as e: errors.append(f" Error parsing XML: {e}") if errors: print(f"FAILED - {len(errors)} comment marker violations:") for error in errors: print(error) return False else: if self.verbose: print("PASSED - All comment markers properly paired") return True def repair(self) -> int: repairs = super().repair() repairs += self.repair_durableId() return repairs def repair_durableId(self) -> int: repairs = 0 for xml_file in self.xml_files: try: content = xml_file.read_text(encoding="utf-8") dom = defusedxml.minidom.parseString(content) modified = False for elem in dom.getElementsByTagName("*"): if not elem.hasAttribute("w16cid:durableId"): continue durable_id = elem.getAttribute("w16cid:durableId") needs_repair = False if xml_file.name == "numbering.xml": try: needs_repair = ( self._parse_id_value(durable_id, base=10) >= 0x7FFFFFFF ) except ValueError: needs_repair = True else: try: needs_repair = ( self._parse_id_value(durable_id, base=16) >= 0x7FFFFFFF ) except ValueError: needs_repair = True if needs_repair: value = random.randint(1, 0x7FFFFFFE) if xml_file.name == "numbering.xml": new_id = str(value) else: new_id = f"{value:08X}" elem.setAttribute("w16cid:durableId", new_id) print( f" Repaired: {xml_file.name}: durableId {durable_id} → {new_id}" ) repairs += 1 modified = True if modified: xml_file.write_bytes(dom.toxml(encoding="UTF-8")) except Exception: pass return repairs if __name__ == "__main__": raise RuntimeError("This module should not be run directly.")