Drop XML parsing, switch to using dnf excludepkgs (#5)
All checks were successful
CI via Tox / tox (pull_request) Successful in 1m27s

Instead of painfully retrieving, decompressing, parsing, editing
and reconstructing the repodata, let's use dnf's excludepkgs
option to achieve the same thing. This is much simpler, faster,
and more robust.

To 'modify' the modified base repos, we use dnf repoquery to
list the packages they contain and their source RPM names, then
construct a table with the NEVRs to be removed for each repo.
Then when we do the repoclosure command, we pass
--setopt <repo>.excludepkgs=<nevrlist> for each modified repo.
This tells dnf to act as if the specified NEVRs simply do not
exist in the specified repo, which is exactly what we want to
achieve.

Signed-off-by: Adam Williamson <awilliam@redhat.com>
This commit is contained in:
Adam Williamson 2026-04-17 14:57:47 -07:00
commit 3a96888ca9
6 changed files with 81 additions and 587 deletions

View file

@ -2,12 +2,12 @@
rmdepcheck is an RPM dependency check tool based on a repository metadata modification approach.
It works by comparing a checked repository to one or more base repositories. First, checks are run
on the base repositories as-is. Next, modified copies of the base repositories' metadata is
created, with all packages from the same source RPM(s) as the package(s) in the checked
repositories removed. Finally, checks are run again on the modified base repositories, with the
checked repositories available to the dependency solver. The results of the two runs are compared.
New failures should indicate problems introduced by the checked repositories. Also, some relevant
checks are run on the checked repositories with reference to the modified base repositories.
on the base repositories as-is. Next, we re-run the checks, but with the checked repository
available and using dnf's `excludepkgs` option to hide from the base repositories all packages from
the same source RPM(s) as the package(s) in the checked repositories removed. The results of the
two runs are compared. New failures should indicate problems introduced by the checked repositories.
Also, some relevant checks are run on the checked repositories with reference to the modified base
repositories.
Optionally, additional base repositories can be specified which will not be modified, and
additional new repositories can be specified which will not be checked directly. The former is
@ -17,22 +17,15 @@ multilib scenarios; it may be desirable to use such an additional repository for
the multilib arch(es), if e.g. installability of these should not be tested directly.
An alternative mode allows simply testing the consequences of *removing* a list of source packages
entirely; in this mode, in the second step, the base repository's metadata is modified to entirely
remove all binary packages built from the specified source packages. The installability check is
skipped in this context.
entirely; in this mode, in the second step, we exclude all binary packages built from the specified
source packages. The installability check is skipped in this context.
## Requirements
rmdepcheck has no run-time Python dependencies outside the standard library. However, it requires
several command-line utilities:
* dnf
* zstd
* curl
It checks for these, and will exit early with an error if any of them is not found. rmdepcheck
is written primarily for Red Hat-family distributions, but should in theory be usable anywhere
these utilities can be installed (and forward slashes act as directory separators).
rmdepcheck has no run-time Python dependencies outside the standard library. Its only external
dependency is dnf. It checks for dnf, and will exit early with an error if it is not found.
rmdepcheck is written primarily for Red Hat-family distributions, but should in theory be usable
anywhere these utilities can be installed (and forward slashes act as directory separators).
If you use a version of dnf older than 5.2.15.0, you may see false failures for 'rich' dependencies,
as older dnf versions did not handle these correctly. Use 5.4.0.0 or newer for the best handling

View file

@ -1,2 +0,0 @@
backports.zstd ; python_version<'3.14'
lxml

View file

@ -56,8 +56,5 @@ show_missing = true
# don't @ me, Hynek
line-length = 100
[tool.mypy]
plugins = ["mypy_plugin_lxml.main"]
[tool.setuptools.dynamic]
dependencies = { file = ["install.requires"] }

View file

@ -19,7 +19,6 @@
#
# Author(s): Adam Williamson <awilliam@redhat.com>
# pylint: disable=c-extension-no-member
"""RPM package installability and reverse-dependency checks using a
repository modification strategy (hence 'rm').
@ -28,60 +27,31 @@ repository modification strategy (hence 'rm').
# Standard libraries
import argparse
import gzip
import hashlib
import json
import lzma
import os
import platform
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from functools import partial
from typing import Any, Generator, Iterable
from typing import Iterable
from urllib.parse import urlparse
import lxml.etree as et
if sys.version_info >= (3, 14):
from compression import zstd
else:
from backports import zstd # pragma: no cover
# type alias for the tuples produced by parse_repoclosure
# can't properly declare this because type statement was only added in
# 3.12, and TypeAlias is deprecated since 3.12 and wasn't in 3.9
DepTuple = tuple[str, str, str]
# Mainly for tests to override to exercise the parser
CHUNKSIZE = 1 << 20
CURLARGS = ("curl", "-s", "-f", "-L", "--retry-delay", "10", "--max-time", "300", "--retry", "5")
# use a fresh temporary cache for each run to avoid collisions between
# runs and polluting the 'real' cache
# pylint: disable-next=consider-using-with
DNFTEMP = tempfile.TemporaryDirectory(prefix="rmdepcheck", dir="/var/tmp")
DNFARGS = ["dnf", "--setopt", f"cachedir={DNFTEMP.name}", "-q", "--disablerepo=*"]
XMLNS = {
"repo": "http://linux.duke.edu/metadata/repo",
"common": "http://linux.duke.edu/metadata/common",
"rpm": "http://linux.duke.edu/metadata/rpm",
}
SUBPCAPTURE = partial(subprocess.run, capture_output=True, text=True, check=False)
SUBPCAPTCHECK = partial(subprocess.run, capture_output=True, text=True, check=True)
SUBPCHECK = partial(subprocess.run, check=True)
REPOHASHES = {}
SAFEPARSER = et.XMLParser(resolve_entities=False)
def mfind(element: et.Element, string: str, ns: dict) -> et.Element:
"""Wrapper for element.find which raises an error if it comes back
with None.
"""
ret = element.find(string, ns)
if ret is not None:
return ret
raise ValueError("Cannot find required element!")
def hash_repo(repo: str) -> str:
@ -130,243 +100,6 @@ def format_rc_errors(errors: list[DepTuple]) -> None:
print(f" {error[2]}")
def get_file(src: str, dest: str) -> None:
"""Just downloads a file from src to dest."""
SUBPCHECK(CURLARGS + ("-o", dest, src))
def download_element(element: et.Element, repourl: str, mrepodir: str) -> str:
"""Given the ET element with information about it, and the URL of
the repo to download from and the local directory to download to,
download the specified data file, returning the filename. Note
mrepodir/repodata is assumed to exist.
"""
elemloc = mfind(element, "repo:location", XMLNS).attrib["href"]
encelemfn = f"{mrepodir}/{elemloc}"
get_file(f"{repourl}/{elemloc}", encelemfn)
return encelemfn
# we can stop using Any and use io.Reader / io.Writer once we stop
# caring about Python < 3.14
@contextmanager
def _open_read(path: str) -> Generator[Any, None, None]:
"""Open a repo metadata file for reading, decompressing transparently.
Written by Cursor 2.6.11 + Claude 4.6 Opus.
"""
if path.endswith(".gz"):
with gzip.open(path, "rb") as f:
yield f
elif path.endswith(".xz"):
with lzma.open(path, "rb") as f:
yield f
elif path.endswith((".zst", ".zstd")):
with zstd.open(path, "rb") as f:
yield f
else:
with open(path, "rb") as f:
yield f
@contextmanager
def _open_write(path: str) -> Generator[Any, None, None]:
"""Open a repo metadata file for writing, compressing transparently.
Written by Cursor 2.6.11 + Claude 4.6 Opus.
"""
if path.endswith(".gz"):
with gzip.open(path, "wb", compresslevel=6) as f:
yield f
elif path.endswith(".xz"):
with lzma.open(path, "wb", preset=6) as f:
yield f
elif path.endswith((".zst", ".zstd")):
with zstd.open(path, "wb", level=3) as f:
yield f
else:
with open(path, "wb") as f:
yield f
def _next_chunk(infh: Any, size: int = 0) -> bytes:
"""Read an arbitrarily-sized chunk of data from infh, ensuring it
ends with a > for ease of parsing. Defaults to quite a large size.
Note that using size=1 acts as 'read to end of next tag'.
"""
if not size:
size = CHUNKSIZE
chunk = infh.read(size)
while chunk and not chunk.endswith(b">"):
char = infh.read(1)
if char:
chunk += char
else:
break
return chunk
def _maybe_remove(currpkg: bytes, removes: Iterable[str], flmode: bool) -> str:
"""Given a bytestring representing a single XML package element
starting "<package" and ending "</package>", parse it to find
the (source) package name and compare against removes to decide
whether it should be removed. Returns the pkgid of the package if
it should be removed, empty string if it should not.
"""
# force in namespace definition, ugh. we have to do this or else
# lxml will refuse to parse the fragment. we know it starts with
# '<package', so replace that
nspkg = b'<package xmlns:rpm="http://linux.duke.edu/metadata/rpm"' + currpkg[8:]
parsed = et.fromstring(nspkg, parser=SAFEPARSER)
if flmode:
pkg = parsed.attrib.get("pkgid", "")
# remove package if name matches removes
if pkg in removes:
return pkg
return ""
# primary mode
# find first child element with a pkgid attribute, usually
# checksum. the text of this element is the pkgid
pkgid = mfind(parsed, "./*[@pkgid]", {}).text or ""
# remove package if source package names matches
if mfind(parsed, "arch", {}).text == "src":
# we are a source package, this is our name
spkg = mfind(parsed, "name", {}).text or ""
else:
# we're binary, this is our srpm
spkg = mfind(mfind(parsed, "format", {}), "rpm:sourcerpm", XMLNS).text or ""
if spkg:
# get name from srpm
spkg = spkg.rsplit("-", 2)[0]
if spkg and spkg in removes:
return pkgid
return ""
def parse_xml(
infh: Any, outfh: Any, removes: Iterable[str], flmode: bool
) -> tuple[set[str], str, int]:
"""Parse primary or filelists XML metadata, remove packages
in removes. Works slightly differently in each mode.
"""
removed: set = set()
# parse infh in chunks, pass through non-package content to outfh.
# parse package content one-by-one, using _maybe_remove to decide
# whether to keep (pass through) or drop each package text blob
currpkg = b""
chunk = _next_chunk(infh)
sha = hashlib.sha256()
size = 0
while chunk:
if currpkg:
# look for end of package text
endpos = chunk.find(b"</package>")
if endpos != -1:
# decide whether to remove package
currpkg += chunk[: endpos + 10]
nextpos = endpos + 10
# if we're at the end of the chunk, read in another
# tag and add it, to simplify the next bit
if nextpos >= len(chunk):
chunk += _next_chunk(infh, size=1)
# if next char is \n, include it in the package block
nextchar = chunk[endpos + 10 : endpos + 11]
if nextchar == b"\n":
currpkg += b"\n"
nextpos = endpos + 11
pkgid = _maybe_remove(currpkg, removes, flmode)
if pkgid:
# we should remove it
removed.add(pkgid)
else:
# pass it through, update csum and size
sha.update(currpkg)
size += len(currpkg)
outfh.write(currpkg)
# reset current package text buffer
currpkg = b""
# move to appropriate position in chunk
chunk = chunk[nextpos:]
continue
# package does not end in current chunk, add whole
# chunk to buffer and read next
currpkg += chunk
chunk = _next_chunk(infh)
continue
# we're not in a package block, so find the next package tag
# there is a potential issue with packager tags, but we should
# always encounter a package tag before we encounter a
# packager tag
startpos = chunk.find(b"<package")
if startpos != -1:
# pass through all content before the tag, updating
# csum and size
sha.update(chunk[:startpos])
size += len(chunk[:startpos])
outfh.write(chunk[:startpos])
# initialize the current package buffer with the tag
currpkg = b"<package"
# move to appropriate position in the chunk, note we
# cannot be at end of chunk as it must end with >
chunk = chunk[startpos + 8 :]
continue
# no package tag found in chunk, pass through whole
# chunk, updating csum and size
sha.update(chunk)
size += len(chunk)
outfh.write(chunk)
# read next chunk
chunk = _next_chunk(infh)
continue
return (removed, sha.hexdigest(), size)
# pylint: disable-next=too-many-locals
def replace_packages(
primfn: str, flfn: str, removes: Iterable[str]
) -> tuple[tuple[str, int, str, int], ...]:
"""Parse the primary and filelists data files, remove any packages
whose source package name matches one in removes, and write out new
files with the correct names (containing their own sha256sum).
Return the checksums and sizes of the new uncompressed and
compressed files, for writing back into the repomd. We use raw
line by line text parsing to do this, because these files are huge
and parsing them with ElementTree.parse uses a huge amount of RAM.
Using iterparse would be messier and harder than doing this.
"""
ret = []
primremoved: Iterable[str] = set()
# find the repodata directory
rddir = os.path.dirname(primfn)
# parse both primary and filelists metadata
for fn, typ, flmode in ((primfn, "primary", False), (flfn, "filelists", True)):
if flmode:
# remove the same packages we removed in the prior iteration
toremove = primremoved
else:
# remove packages built from the srpm names in 'removes'
toremove = removes
# get the file extensions
exts = os.path.basename(fn).split(".")[1:]
# construct a temporary filename with the same extensions
tempfn = ".".join([f"{rddir}/{typ}temp"] + exts)
with _open_read(fn) as infh, _open_write(tempfn) as outfh:
# parse from input file to temporary file with inline
# compression, checksumming and size discovery
removed, opensum, opensize = parse_xml(infh, outfh, toremove, flmode=flmode)
if not flmode:
# populate the to-remove set for the next iteration
primremoved = removed
# get compressed checksum and size
with open(tempfn, "rb") as outfh:
csum = hashlib.sha256(outfh.read()).hexdigest()
size = os.path.getsize(tempfn)
# rename output file to expected name with checksum and type
os.rename(tempfn, ".".join([f"{rddir}/{csum}-{typ}"] + exts))
# return compressed and uncompressed sums and sizes
ret.append((csum, size, opensum, opensize))
return tuple(ret)
def get_base_repoclosure(baserepos: Iterable[str], nmbaserepos: Iterable[str]) -> str:
"""Gets the reference repoclosure text. Both to-be-modified and
not-modified base repos are available to the solver, but only the
@ -381,101 +114,57 @@ def get_base_repoclosure(baserepos: Iterable[str], nmbaserepos: Iterable[str]) -
return SUBPCAPTURE(cmdargs).stdout
# pylint: disable-next=too-many-locals
def get_modified_repoclosure(
def get_modified_and_new_repoclosure(
mrepos: Iterable[str],
mreposdir: str,
nmrepos: Iterable[str],
nrepos: Iterable[str],
nmrepos: list[str],
nrepos: list[str],
removes: Iterable[str],
) -> str:
"""Does the repository metadata modification (the clever bit!) and
returns the modified repoclosure text. Non-modified base repos,
modified base repos after modification, and the new repo are
available to the solver; only modified base repos are checked.
) -> tuple[str, str]:
"""Runs repoclosure with new repo included and excludepkgs used
for modified base repos, and returns the modified repoclosure
text. Non-modified base repos, modified base repos after
modification, and the new repo are available to the solver; only
modified base repos are checked.
"""
args = DNFARGS + ["repoclosure"]
queryargs = DNFARGS + ["repoquery", "--queryformat", "%{source_name},%{full_nevra}\n"]
rcargs = DNFARGS + ["repoclosure"]
for mrepo in mrepos:
mrepodir = f"{mreposdir}/{hash_repo(mrepo)}"
os.makedirs(f"{mrepodir}/repodata")
repomdfn = f"{mrepodir}/repodata/repomd.xml"
get_file(f"{mrepo}/repodata/repomd.xml", repomdfn)
repomdtree = et.parse(repomdfn, parser=SAFEPARSER)
repomdroot = repomdtree.getroot()
# we need to also download and modify filelists, for file
# dependencies that aren't included in primary
filelists = mfind(repomdroot, "repo:data[@type='filelists']", XMLNS)
flfn = download_element(filelists, mrepo, mrepodir)
flexts = os.path.basename(flfn).split(".")[1:]
primary = mfind(repomdroot, "repo:data[@type='primary']", XMLNS)
primfn = download_element(primary, mrepo, mrepodir)
primexts = os.path.basename(primfn).split(".")[1:]
# https://github.com/pylint-dev/pylint/issues/5671#issuecomment-4239834783
primdata, fldata = replace_packages(primfn, flfn, removes) # pylint:disable=W0632
# we also need the module metadata if present, or else
# module packages will be treated as non-module and cause
# false results
modules = repomdroot.find("repo:data[@type='modules']", XMLNS)
if modules is not None:
# setting up a test repo with module metadata is a huge pain
download_element(modules, mrepo, mrepodir) # pragma: no cover
# modify the repomd
for typ, data in ((primary, primdata), (filelists, fldata)):
csum, size, opensum, opensize = data
mfind(typ, "repo:checksum", XMLNS).text = csum
mfind(typ, "repo:size", XMLNS).text = str(size)
mfind(typ, "repo:open-checksum", XMLNS).text = opensum
mfind(typ, "repo:open-size", XMLNS).text = str(opensize)
if typ == primary:
mfind(primary, "repo:location", XMLNS).attrib["href"] = ".".join(
[f"repodata/{csum}-primary"] + primexts
)
else:
mfind(filelists, "repo:location", XMLNS).attrib["href"] = ".".join(
[f"repodata/{csum}-filelists"] + flexts
)
# figure out what to exclude
excludes = ""
args = queryargs + ["--repofrompath", f"{hash_repo(mrepo)},{mrepo}"]
out = SUBPCAPTURE(args).stdout
for line in out.splitlines():
# sname, nevr
elems = line.split(",")
if len(elems) != 2:
continue # pragma: no cover
if elems[0] in removes:
if not excludes:
excludes = elems[0]
else:
excludes += f",{elems[0]}"
# add each modified base repo with excludepkgs set to the
# list we discovered above, in the repoclosure args
rcargs.extend(["--repofrompath", f"{hash_repo(mrepo)},{mrepo}"])
rcargs.extend(["--setopt", f"{hash_repo(mrepo)}.excludepkgs={excludes}"])
alldata = repomdroot.findall("repo:data[@type]", XMLNS)
notused = [data for data in alldata if data not in [primary, filelists, modules]]
for item in notused:
repomdroot.remove(item)
repomdtree.write(repomdfn)
# add the modified repo to the repoclosure command
args.extend(["--repofrompath", f"{hash_repo(mrepo)},{mrepodir}"])
# now add the non-modified base repos
for nmrepo in nmrepos:
args.extend(["--repofrompath", f"{hash_repo(nmrepo)},{nmrepo}"])
# now add the new package repos
for nrepo in nrepos:
args.extend(["--repofrompath", f"{hash_repo(nrepo)},{nrepo}"])
# now add the non-modified base repos and new package repos
for repo in nmrepos + nrepos:
rcargs.extend(["--repofrompath", f"{hash_repo(repo)},{repo}"])
# finally, add the check arg
args.append("--check")
args.append(",".join([hash_repo(mrepo) for mrepo in mrepos]))
rcargs.append("--check")
rcargs.append(",".join([hash_repo(mrepo) for mrepo in mrepos]))
ret = SUBPCAPTURE(args).stdout
return ret
def get_new_repoclosure(
mrepos: Iterable[str], mreposdir: str, nmrepos: Iterable[str], nrepo: str
) -> str:
"""Gets and returns repoclosure text for the new repository; this
is effectively an installability check. All base repos are
available to the solver but are not checked. Note this is run
*after* repo modification, so the check runs against the modified
versions of the modifiable base repositories.
"""
cmdargs = DNFARGS + ["repoclosure"]
for mrepo in mrepos:
mrepodir = f"{mreposdir}/{hash_repo(mrepo)}"
cmdargs.extend(["--repofrompath", f"{hash_repo(mrepo)},{mrepodir}"])
for repo in nmrepos:
cmdargs.extend(["--repofrompath", f"{hash_repo(repo)},{repo}"])
cmdargs.extend(["--repofrompath", f"{hash_repo(nrepo)},{nrepo}", "--check", hash_repo(nrepo)])
return SUBPCAPTURE(cmdargs).stdout
# get the modified repoclosure
mod = SUBPCAPTURE(rcargs).stdout
new = ""
if nrepos:
# the first nrepo is the checked repo, only check that
rcargs[-1] = hash_repo(nrepos[0])
# get the new repoclosure
new = SUBPCAPTURE(rcargs).stdout
return (mod, new)
def get_source_packages(repos: Iterable[str]) -> set[str]:
@ -648,7 +337,7 @@ def parse_args() -> argparse.Namespace:
def check_utils() -> None:
"""Check required utilities are installed."""
missing = []
for prog in (("dnf", "--version"), ("curl", "-V")):
for prog in (("dnf", "--version"),):
try:
subprocess.run(prog, stdout=subprocess.DEVNULL, check=True)
except FileNotFoundError:
@ -678,24 +367,17 @@ def main() -> None:
baserc = parse_repoclosure(get_base_repoclosure(args.baserepos, args.nmbaserepos))
# place to stash the modified repos
with tempfile.TemporaryDirectory() as mreposdir:
# get the modified rpmclosure output
modrc = parse_repoclosure(
get_modified_repoclosure(
args.baserepos, mreposdir, args.nmbaserepos, nrepos, sources
)
)
# get the modified rpmclosure output
modraw, newraw = get_modified_and_new_repoclosure(
args.baserepos, args.nmbaserepos, nrepos, sources
)
modrc = parse_repoclosure(modraw)
newrc = []
if newraw:
newrc = parse_repoclosure(newraw)
# figure out the diffs
newerrors = [dep for dep in modrc if dep not in baserc]
fixederrors = [dep for dep in baserc if dep not in modrc]
newrc = []
if args.repo:
# get repoclosure on new repo - this is an installability test
newrc = parse_repoclosure(
get_new_repoclosure(args.baserepos, mreposdir, args.nmbaserepos, args.repo)
)
newerrors = [dep for dep in modrc if dep not in baserc]
fixederrors = [dep for dep in baserc if dep not in modrc]
preexisting = handle_preexisting(fixederrors, newrc)
# output

View file

@ -22,22 +22,10 @@
"""Tests for rmdepcheck."""
import glob
import gzip
import io
import lzma
import os
import shutil
import sys
import tempfile
import xml.etree.ElementTree as et
from unittest import mock
if sys.version_info >= (3, 14):
from compression import zstd
else:
from backports import zstd
import pytest
import rmdepcheck
@ -46,15 +34,6 @@ TESTDATA = f"{HERE}/testdata"
REPOS = f"{TESTDATA}/repos"
def test_mfind():
repomdtree = et.parse(f"{REPOS}/base/repodata/repomd.xml")
# test_get_primary and various others test the success path, so
# we'll just test failure, which shouldn't ever really happen
assert repomdtree.find("foobar", {}) is None
with pytest.raises(ValueError):
rmdepcheck.mfind(repomdtree, "foobar", {})
def test_parse_repoclosure():
with open(f"{TESTDATA}/test_parse_repoclosure.txt", "r", encoding="utf-8") as fh:
rctext = fh.read()
@ -80,147 +59,6 @@ def test_format_rc_errors(capsys):
assert captured.out == exptext
def test_get_file():
with tempfile.TemporaryDirectory() as tempdir:
testfile = f"file://{HERE}/test_rmdepcheck.py"
rmdepcheck.get_file(testfile, f"{tempdir}/test_rmdepcheck.py")
with open(f"{tempdir}/test_rmdepcheck.py", "r", encoding="utf-8") as testfh:
assert "This file is" in testfh.read()
def test_download_element():
repomdtree = et.parse(f"{REPOS}/base/repodata/repomd.xml")
repomdroot = repomdtree.getroot()
primary = repomdroot.find("repo:data[@type='primary']", rmdepcheck.XMLNS)
assert isinstance(primary, et.Element)
assert primary.attrib == {"type": "primary"}
with tempfile.TemporaryDirectory() as tempdir:
os.makedirs(f"{tempdir}/repodata")
rmdepcheck.download_element(primary, f"file://{REPOS}/base", tempdir)
# NOTE: this filename changes any time mkrepos.py is run
assert os.path.exists(
# pylint: disable-next=line-too-long
f"{tempdir}/repodata/54942fbb3415a66f4ca49463d36439407c816b3eed8e30fc43016bd70ac9d456-primary.xml.zst"
)
def test_parse_xml_empty():
"""Test parse_xml can handle an empty repo. We mostly test it
implicitly, but easiest to test this explicitly.
"""
empty = b"""<?xml version="1.0" encoding="UTF-8"?>
<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
</metadata>
"""
infh = io.BytesIO(empty)
outfh = io.BytesIO()
assert rmdepcheck.parse_xml(infh, outfh, ["somepackage"], False) == (
set(),
"0f9fd176ae380f60833f432e009774b256c2e77fc50740be60eaaf06d49ee004",
168,
)
outfh.seek(0)
assert outfh.read() == empty
@pytest.mark.parametrize("extension", (".xml", ".xml.zst", ".xml.gz", ".xml.xz"))
@pytest.mark.parametrize(
"repotup",
(
(
"binary",
(
(
"8cba35dd9233f535f90d2c1a6f7c00a8ae8bfcc9b0854f50ef593ab8f685cd7c",
9596,
),
(
"03254e576554e240d9d2434ac9e6f06e71d50adf5e5699acdecd28d0ceafaa37",
1562,
),
),
),
(
"source",
(
(
"c55ba80f7615e041c9d84b8237a6dd4afb9e56c9d7e1776324a164d4e0b921a1",
1082,
),
(
"97c9e867dca2f9f1d0c1f45e77c5ffb9e6b91e5a76b73240bc162aa4d8b8bd9b",
282,
),
),
),
),
)
def test_replace_packages(extension, repotup):
# this exercises some rarely-encountered edge case parser paths
if extension == ".xml":
rmdepcheck.CHUNKSIZE = 1
repo, expected = repotup
with tempfile.TemporaryDirectory() as tempdir:
shutil.copy2(
# binary.xml is an old version of base's primary file
# source.xml is a primary file from a repo with just
# ccc.src and ddd.src packages
f"{TESTDATA}/test_replace_primary_{repo}{extension}",
f"{tempdir}/testprim{extension}",
)
shutil.copy2(
f"{TESTDATA}/test_replace_fl_{repo}{extension}",
f"{tempdir}/testfl{extension}",
)
ret = rmdepcheck.replace_packages(
f"{tempdir}/testprim{extension}", f"{tempdir}/testfl{extension}", "ccc"
)
# skip the compressed sum and size as they may change with
# python version
ret = ((ret[0][2], ret[0][3]), (ret[1][2], ret[1][3]))
assert ret == expected
gotprim = glob.glob(f"{tempdir}/*primary.xml*")[0]
gotfl = glob.glob(f"{tempdir}/*filelists.xml*")[0]
funcmap = {
".xml": open,
".xml.zst": zstd.open,
".xml.gz": gzip.open,
".xml.xz": lzma.open,
}
with funcmap[extension](gotprim, "rb") as gotpfh:
with open(f"{TESTDATA}/test_replace_primary_{repo}_expected.xml", "rb") as exppfh:
assert gotpfh.read() == exppfh.read()
with funcmap[extension](gotfl, "rb") as gotffh:
with open(f"{TESTDATA}/test_replace_fl_{repo}_expected.xml", "rb") as expffh:
assert gotffh.read() == expffh.read()
def test_replace_packages_minified():
rmdepcheck.CHUNKSIZE = 1
with tempfile.TemporaryDirectory() as tempdir:
shutil.copy2(
# same as source but with all newlines removed
f"{TESTDATA}/test_replace_primary_source_minified.xml",
f"{tempdir}/testprim.xml",
)
shutil.copy2(
# same as source but with all newlines removed
f"{TESTDATA}/test_replace_fl_source_minified.xml",
f"{tempdir}/testfl.xml",
)
rmdepcheck.replace_packages(f"{tempdir}/testprim.xml", f"{tempdir}/testfl.xml", "ccc")
gotprim = glob.glob(f"{tempdir}/*primary.xml*")[0]
gotfl = glob.glob(f"{tempdir}/*filelists.xml*")[0]
with open(gotprim, "rb") as gotpfh:
with open(
f"{TESTDATA}/test_replace_primary_source_minified_expected.xml", "rb"
) as exppfh:
assert gotpfh.read() == exppfh.read()
with open(gotfl, "rb") as gotffh:
with open(f"{TESTDATA}/test_replace_fl_source_minified_expected.xml", "rb") as expffh:
assert gotffh.read() == expffh.read()
def test_get_base_repoclosure():
repo = f"file://{REPOS}/base"
with open(f"{TESTDATA}/test_get_base_repoclosure.txt", "r", encoding="utf-8") as testfh:
@ -230,31 +68,19 @@ def test_get_base_repoclosure():
assert ret == expected
def test_get_modified_repoclosure():
brepo = f"file://{REPOS}/base"
with open(f"{TESTDATA}/test_get_modified_repoclosure.txt", "r", encoding="utf-8") as testfh:
expected = testfh.read()
expected = expected.replace("{HASH}", rmdepcheck.hash_repo(brepo))
with tempfile.TemporaryDirectory() as tempdir:
ret = rmdepcheck.get_modified_repoclosure(
[brepo], tempdir, [], [f"file://{REPOS}/new"], ["aaa", "ccc", "eee", "fff", "ggg"]
)
assert ret == expected
def test_get_new_repoclosure():
def test_get_modified_and_new_repoclosure():
brepo = f"file://{REPOS}/base"
nrepo = f"file://{REPOS}/new"
with open(f"{TESTDATA}/test_get_modified_repoclosure.txt", "r", encoding="utf-8") as testfh:
expectedmod = testfh.read()
expectedmod = expectedmod.replace("{HASH}", rmdepcheck.hash_repo(brepo))
with open(f"{TESTDATA}/test_get_new_repoclosure.txt", "r", encoding="utf-8") as testfh:
expected = testfh.read()
expected = expected.replace("{HASH}", rmdepcheck.hash_repo(nrepo))
with tempfile.TemporaryDirectory() as tempdir:
# populate the modified repo dir for accuracy
rmdepcheck.get_modified_repoclosure(
[brepo], tempdir, [], [f"file://{REPOS}/new"], ["aaa", "ccc", "eee", "fff", "ggg"]
)
ret = rmdepcheck.get_new_repoclosure([brepo], tempdir, [], nrepo)
assert ret == expected
expectednew = testfh.read()
expectednew = expectednew.replace("{HASH}", rmdepcheck.hash_repo(nrepo))
ret = rmdepcheck.get_modified_and_new_repoclosure(
[brepo], [], [f"file://{REPOS}/new"], ["aaa", "ccc", "eee", "fff", "ggg"]
)
assert ret == (expectedmod, expectednew)
def test_get_source_packages():
@ -341,10 +167,10 @@ def test_check_arch(_):
@mock.patch("subprocess.run", autospec=True)
def test_check_utils(mock_run):
rmdepcheck.check_utils()
mock_run.side_effect = [None, FileNotFoundError]
mock_run.side_effect = FileNotFoundError
with pytest.raises(SystemExit) as excinfo:
rmdepcheck.check_utils()
assert excinfo.value.code == "Please install missing required utilities: curl"
assert excinfo.value.code == "Please install missing required utilities: dnf"
@mock.patch("rmdepcheck.check_utils", side_effect=KeyboardInterrupt)

View file

@ -1,8 +1,6 @@
black
coverage
diff-cover
lxml
mypy
pylint
pytest-cov
types-lxml