Drop XML parsing, switch to using dnf excludepkgs (#5)
Instead of painfully retrieving, decompressing, parsing, editing and reconstructing the repodata, let's use dnf's excludepkgs option to achieve the same thing. This is much simpler, faster, and more robust. To 'modify' the modified base repos, we use dnf repoquery to list the packages they contain and their source RPM names, then construct a table with the NEVRs to be removed for each repo. Then when we do the repoclosure command, we pass --setopt <repo>.excludepkgs=<nevrlist> for each modified repo. This tells dnf to act as if the specified NEVRs simply do not exist in the specified repo, which is exactly what we want to achieve. Signed-off-by: Adam Williamson <awilliam@redhat.com>
This commit is contained in:
parent
f209553299
commit
db1414151c
6 changed files with 81 additions and 587 deletions
31
README.md
31
README.md
|
|
@ -2,12 +2,12 @@
|
|||
|
||||
rmdepcheck is an RPM dependency check tool based on a repository metadata modification approach.
|
||||
It works by comparing a checked repository to one or more base repositories. First, checks are run
|
||||
on the base repositories as-is. Next, modified copies of the base repositories' metadata is
|
||||
created, with all packages from the same source RPM(s) as the package(s) in the checked
|
||||
repositories removed. Finally, checks are run again on the modified base repositories, with the
|
||||
checked repositories available to the dependency solver. The results of the two runs are compared.
|
||||
New failures should indicate problems introduced by the checked repositories. Also, some relevant
|
||||
checks are run on the checked repositories with reference to the modified base repositories.
|
||||
on the base repositories as-is. Next, we re-run the checks, but with the checked repository
|
||||
available and using dnf's `excludepkgs` option to hide from the base repositories all packages from
|
||||
the same source RPM(s) as the package(s) in the checked repositories removed. The results of the
|
||||
two runs are compared. New failures should indicate problems introduced by the checked repositories.
|
||||
Also, some relevant checks are run on the checked repositories with reference to the modified base
|
||||
repositories.
|
||||
|
||||
Optionally, additional base repositories can be specified which will not be modified, and
|
||||
additional new repositories can be specified which will not be checked directly. The former is
|
||||
|
|
@ -17,22 +17,15 @@ multilib scenarios; it may be desirable to use such an additional repository for
|
|||
the multilib arch(es), if e.g. installability of these should not be tested directly.
|
||||
|
||||
An alternative mode allows simply testing the consequences of *removing* a list of source packages
|
||||
entirely; in this mode, in the second step, the base repository's metadata is modified to entirely
|
||||
remove all binary packages built from the specified source packages. The installability check is
|
||||
skipped in this context.
|
||||
entirely; in this mode, in the second step, we exclude all binary packages built from the specified
|
||||
source packages. The installability check is skipped in this context.
|
||||
|
||||
## Requirements
|
||||
|
||||
rmdepcheck has no run-time Python dependencies outside the standard library. However, it requires
|
||||
several command-line utilities:
|
||||
|
||||
* dnf
|
||||
* zstd
|
||||
* curl
|
||||
|
||||
It checks for these, and will exit early with an error if any of them is not found. rmdepcheck
|
||||
is written primarily for Red Hat-family distributions, but should in theory be usable anywhere
|
||||
these utilities can be installed (and forward slashes act as directory separators).
|
||||
rmdepcheck has no run-time Python dependencies outside the standard library. Its only external
|
||||
dependency is dnf. It checks for dnf, and will exit early with an error if it is not found.
|
||||
rmdepcheck is written primarily for Red Hat-family distributions, but should in theory be usable
|
||||
anywhere these utilities can be installed (and forward slashes act as directory separators).
|
||||
|
||||
If you use a version of dnf older than 5.2.15.0, you may see false failures for 'rich' dependencies,
|
||||
as older dnf versions did not handle these correctly. Use 5.4.0.0 or newer for the best handling
|
||||
|
|
|
|||
|
|
@ -1,2 +0,0 @@
|
|||
backports.zstd ; python_version<'3.14'
|
||||
lxml
|
||||
|
|
@ -56,8 +56,5 @@ show_missing = true
|
|||
# don't @ me, Hynek
|
||||
line-length = 100
|
||||
|
||||
[tool.mypy]
|
||||
plugins = ["mypy_plugin_lxml.main"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
dependencies = { file = ["install.requires"] }
|
||||
|
|
|
|||
428
rmdepcheck.py
428
rmdepcheck.py
|
|
@ -19,7 +19,6 @@
|
|||
#
|
||||
# Author(s): Adam Williamson <awilliam@redhat.com>
|
||||
|
||||
# pylint: disable=c-extension-no-member
|
||||
|
||||
"""RPM package installability and reverse-dependency checks using a
|
||||
repository modification strategy (hence 'rm').
|
||||
|
|
@ -28,60 +27,31 @@ repository modification strategy (hence 'rm').
|
|||
# Standard libraries
|
||||
|
||||
import argparse
|
||||
import gzip
|
||||
import hashlib
|
||||
import json
|
||||
import lzma
|
||||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
from functools import partial
|
||||
from typing import Any, Generator, Iterable
|
||||
from typing import Iterable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import lxml.etree as et
|
||||
|
||||
if sys.version_info >= (3, 14):
|
||||
from compression import zstd
|
||||
else:
|
||||
from backports import zstd # pragma: no cover
|
||||
|
||||
# type alias for the tuples produced by parse_repoclosure
|
||||
# can't properly declare this because type statement was only added in
|
||||
# 3.12, and TypeAlias is deprecated since 3.12 and wasn't in 3.9
|
||||
DepTuple = tuple[str, str, str]
|
||||
|
||||
# Mainly for tests to override to exercise the parser
|
||||
CHUNKSIZE = 1 << 20
|
||||
CURLARGS = ("curl", "-s", "-f", "-L", "--retry-delay", "10", "--max-time", "300", "--retry", "5")
|
||||
# use a fresh temporary cache for each run to avoid collisions between
|
||||
# runs and polluting the 'real' cache
|
||||
# pylint: disable-next=consider-using-with
|
||||
DNFTEMP = tempfile.TemporaryDirectory(prefix="rmdepcheck", dir="/var/tmp")
|
||||
DNFARGS = ["dnf", "--setopt", f"cachedir={DNFTEMP.name}", "-q", "--disablerepo=*"]
|
||||
XMLNS = {
|
||||
"repo": "http://linux.duke.edu/metadata/repo",
|
||||
"common": "http://linux.duke.edu/metadata/common",
|
||||
"rpm": "http://linux.duke.edu/metadata/rpm",
|
||||
}
|
||||
SUBPCAPTURE = partial(subprocess.run, capture_output=True, text=True, check=False)
|
||||
SUBPCAPTCHECK = partial(subprocess.run, capture_output=True, text=True, check=True)
|
||||
SUBPCHECK = partial(subprocess.run, check=True)
|
||||
REPOHASHES = {}
|
||||
SAFEPARSER = et.XMLParser(resolve_entities=False)
|
||||
|
||||
|
||||
def mfind(element: et.Element, string: str, ns: dict) -> et.Element:
|
||||
"""Wrapper for element.find which raises an error if it comes back
|
||||
with None.
|
||||
"""
|
||||
ret = element.find(string, ns)
|
||||
if ret is not None:
|
||||
return ret
|
||||
raise ValueError("Cannot find required element!")
|
||||
|
||||
|
||||
def hash_repo(repo: str) -> str:
|
||||
|
|
@ -130,243 +100,6 @@ def format_rc_errors(errors: list[DepTuple]) -> None:
|
|||
print(f" {error[2]}")
|
||||
|
||||
|
||||
def get_file(src: str, dest: str) -> None:
|
||||
"""Just downloads a file from src to dest."""
|
||||
SUBPCHECK(CURLARGS + ("-o", dest, src))
|
||||
|
||||
|
||||
def download_element(element: et.Element, repourl: str, mrepodir: str) -> str:
|
||||
"""Given the ET element with information about it, and the URL of
|
||||
the repo to download from and the local directory to download to,
|
||||
download the specified data file, returning the filename. Note
|
||||
mrepodir/repodata is assumed to exist.
|
||||
"""
|
||||
elemloc = mfind(element, "repo:location", XMLNS).attrib["href"]
|
||||
encelemfn = f"{mrepodir}/{elemloc}"
|
||||
get_file(f"{repourl}/{elemloc}", encelemfn)
|
||||
return encelemfn
|
||||
|
||||
|
||||
# we can stop using Any and use io.Reader / io.Writer once we stop
|
||||
# caring about Python < 3.14
|
||||
@contextmanager
|
||||
def _open_read(path: str) -> Generator[Any, None, None]:
|
||||
"""Open a repo metadata file for reading, decompressing transparently.
|
||||
Written by Cursor 2.6.11 + Claude 4.6 Opus.
|
||||
"""
|
||||
if path.endswith(".gz"):
|
||||
with gzip.open(path, "rb") as f:
|
||||
yield f
|
||||
elif path.endswith(".xz"):
|
||||
with lzma.open(path, "rb") as f:
|
||||
yield f
|
||||
elif path.endswith((".zst", ".zstd")):
|
||||
with zstd.open(path, "rb") as f:
|
||||
yield f
|
||||
else:
|
||||
with open(path, "rb") as f:
|
||||
yield f
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _open_write(path: str) -> Generator[Any, None, None]:
|
||||
"""Open a repo metadata file for writing, compressing transparently.
|
||||
Written by Cursor 2.6.11 + Claude 4.6 Opus.
|
||||
"""
|
||||
if path.endswith(".gz"):
|
||||
with gzip.open(path, "wb", compresslevel=6) as f:
|
||||
yield f
|
||||
elif path.endswith(".xz"):
|
||||
with lzma.open(path, "wb", preset=6) as f:
|
||||
yield f
|
||||
elif path.endswith((".zst", ".zstd")):
|
||||
with zstd.open(path, "wb", level=3) as f:
|
||||
yield f
|
||||
else:
|
||||
with open(path, "wb") as f:
|
||||
yield f
|
||||
|
||||
|
||||
def _next_chunk(infh: Any, size: int = 0) -> bytes:
|
||||
"""Read an arbitrarily-sized chunk of data from infh, ensuring it
|
||||
ends with a > for ease of parsing. Defaults to quite a large size.
|
||||
Note that using size=1 acts as 'read to end of next tag'.
|
||||
"""
|
||||
if not size:
|
||||
size = CHUNKSIZE
|
||||
chunk = infh.read(size)
|
||||
while chunk and not chunk.endswith(b">"):
|
||||
char = infh.read(1)
|
||||
if char:
|
||||
chunk += char
|
||||
else:
|
||||
break
|
||||
return chunk
|
||||
|
||||
|
||||
def _maybe_remove(currpkg: bytes, removes: Iterable[str], flmode: bool) -> str:
|
||||
"""Given a bytestring representing a single XML package element
|
||||
starting "<package" and ending "</package>", parse it to find
|
||||
the (source) package name and compare against removes to decide
|
||||
whether it should be removed. Returns the pkgid of the package if
|
||||
it should be removed, empty string if it should not.
|
||||
"""
|
||||
# force in namespace definition, ugh. we have to do this or else
|
||||
# lxml will refuse to parse the fragment. we know it starts with
|
||||
# '<package', so replace that
|
||||
nspkg = b'<package xmlns:rpm="http://linux.duke.edu/metadata/rpm"' + currpkg[8:]
|
||||
parsed = et.fromstring(nspkg, parser=SAFEPARSER)
|
||||
if flmode:
|
||||
pkg = parsed.attrib.get("pkgid", "")
|
||||
# remove package if name matches removes
|
||||
if pkg in removes:
|
||||
return pkg
|
||||
return ""
|
||||
# primary mode
|
||||
# find first child element with a pkgid attribute, usually
|
||||
# checksum. the text of this element is the pkgid
|
||||
pkgid = mfind(parsed, "./*[@pkgid]", {}).text or ""
|
||||
# remove package if source package names matches
|
||||
if mfind(parsed, "arch", {}).text == "src":
|
||||
# we are a source package, this is our name
|
||||
spkg = mfind(parsed, "name", {}).text or ""
|
||||
else:
|
||||
# we're binary, this is our srpm
|
||||
spkg = mfind(mfind(parsed, "format", {}), "rpm:sourcerpm", XMLNS).text or ""
|
||||
if spkg:
|
||||
# get name from srpm
|
||||
spkg = spkg.rsplit("-", 2)[0]
|
||||
if spkg and spkg in removes:
|
||||
return pkgid
|
||||
return ""
|
||||
|
||||
|
||||
def parse_xml(
|
||||
infh: Any, outfh: Any, removes: Iterable[str], flmode: bool
|
||||
) -> tuple[set[str], str, int]:
|
||||
"""Parse primary or filelists XML metadata, remove packages
|
||||
in removes. Works slightly differently in each mode.
|
||||
"""
|
||||
removed: set = set()
|
||||
# parse infh in chunks, pass through non-package content to outfh.
|
||||
# parse package content one-by-one, using _maybe_remove to decide
|
||||
# whether to keep (pass through) or drop each package text blob
|
||||
currpkg = b""
|
||||
chunk = _next_chunk(infh)
|
||||
sha = hashlib.sha256()
|
||||
size = 0
|
||||
while chunk:
|
||||
if currpkg:
|
||||
# look for end of package text
|
||||
endpos = chunk.find(b"</package>")
|
||||
if endpos != -1:
|
||||
# decide whether to remove package
|
||||
currpkg += chunk[: endpos + 10]
|
||||
nextpos = endpos + 10
|
||||
# if we're at the end of the chunk, read in another
|
||||
# tag and add it, to simplify the next bit
|
||||
if nextpos >= len(chunk):
|
||||
chunk += _next_chunk(infh, size=1)
|
||||
# if next char is \n, include it in the package block
|
||||
nextchar = chunk[endpos + 10 : endpos + 11]
|
||||
if nextchar == b"\n":
|
||||
currpkg += b"\n"
|
||||
nextpos = endpos + 11
|
||||
pkgid = _maybe_remove(currpkg, removes, flmode)
|
||||
if pkgid:
|
||||
# we should remove it
|
||||
removed.add(pkgid)
|
||||
else:
|
||||
# pass it through, update csum and size
|
||||
sha.update(currpkg)
|
||||
size += len(currpkg)
|
||||
outfh.write(currpkg)
|
||||
# reset current package text buffer
|
||||
currpkg = b""
|
||||
# move to appropriate position in chunk
|
||||
chunk = chunk[nextpos:]
|
||||
continue
|
||||
# package does not end in current chunk, add whole
|
||||
# chunk to buffer and read next
|
||||
currpkg += chunk
|
||||
chunk = _next_chunk(infh)
|
||||
continue
|
||||
# we're not in a package block, so find the next package tag
|
||||
# there is a potential issue with packager tags, but we should
|
||||
# always encounter a package tag before we encounter a
|
||||
# packager tag
|
||||
startpos = chunk.find(b"<package")
|
||||
if startpos != -1:
|
||||
# pass through all content before the tag, updating
|
||||
# csum and size
|
||||
sha.update(chunk[:startpos])
|
||||
size += len(chunk[:startpos])
|
||||
outfh.write(chunk[:startpos])
|
||||
# initialize the current package buffer with the tag
|
||||
currpkg = b"<package"
|
||||
# move to appropriate position in the chunk, note we
|
||||
# cannot be at end of chunk as it must end with >
|
||||
chunk = chunk[startpos + 8 :]
|
||||
continue
|
||||
# no package tag found in chunk, pass through whole
|
||||
# chunk, updating csum and size
|
||||
sha.update(chunk)
|
||||
size += len(chunk)
|
||||
outfh.write(chunk)
|
||||
# read next chunk
|
||||
chunk = _next_chunk(infh)
|
||||
continue
|
||||
|
||||
return (removed, sha.hexdigest(), size)
|
||||
|
||||
|
||||
# pylint: disable-next=too-many-locals
|
||||
def replace_packages(
|
||||
primfn: str, flfn: str, removes: Iterable[str]
|
||||
) -> tuple[tuple[str, int, str, int], ...]:
|
||||
"""Parse the primary and filelists data files, remove any packages
|
||||
whose source package name matches one in removes, and write out new
|
||||
files with the correct names (containing their own sha256sum).
|
||||
Return the checksums and sizes of the new uncompressed and
|
||||
compressed files, for writing back into the repomd. We use raw
|
||||
line by line text parsing to do this, because these files are huge
|
||||
and parsing them with ElementTree.parse uses a huge amount of RAM.
|
||||
Using iterparse would be messier and harder than doing this.
|
||||
"""
|
||||
ret = []
|
||||
primremoved: Iterable[str] = set()
|
||||
# find the repodata directory
|
||||
rddir = os.path.dirname(primfn)
|
||||
# parse both primary and filelists metadata
|
||||
for fn, typ, flmode in ((primfn, "primary", False), (flfn, "filelists", True)):
|
||||
if flmode:
|
||||
# remove the same packages we removed in the prior iteration
|
||||
toremove = primremoved
|
||||
else:
|
||||
# remove packages built from the srpm names in 'removes'
|
||||
toremove = removes
|
||||
# get the file extensions
|
||||
exts = os.path.basename(fn).split(".")[1:]
|
||||
# construct a temporary filename with the same extensions
|
||||
tempfn = ".".join([f"{rddir}/{typ}temp"] + exts)
|
||||
with _open_read(fn) as infh, _open_write(tempfn) as outfh:
|
||||
# parse from input file to temporary file with inline
|
||||
# compression, checksumming and size discovery
|
||||
removed, opensum, opensize = parse_xml(infh, outfh, toremove, flmode=flmode)
|
||||
if not flmode:
|
||||
# populate the to-remove set for the next iteration
|
||||
primremoved = removed
|
||||
# get compressed checksum and size
|
||||
with open(tempfn, "rb") as outfh:
|
||||
csum = hashlib.sha256(outfh.read()).hexdigest()
|
||||
size = os.path.getsize(tempfn)
|
||||
# rename output file to expected name with checksum and type
|
||||
os.rename(tempfn, ".".join([f"{rddir}/{csum}-{typ}"] + exts))
|
||||
# return compressed and uncompressed sums and sizes
|
||||
ret.append((csum, size, opensum, opensize))
|
||||
return tuple(ret)
|
||||
|
||||
|
||||
def get_base_repoclosure(baserepos: Iterable[str], nmbaserepos: Iterable[str]) -> str:
|
||||
"""Gets the reference repoclosure text. Both to-be-modified and
|
||||
not-modified base repos are available to the solver, but only the
|
||||
|
|
@ -381,101 +114,57 @@ def get_base_repoclosure(baserepos: Iterable[str], nmbaserepos: Iterable[str]) -
|
|||
return SUBPCAPTURE(cmdargs).stdout
|
||||
|
||||
|
||||
# pylint: disable-next=too-many-locals
|
||||
def get_modified_repoclosure(
|
||||
def get_modified_and_new_repoclosure(
|
||||
mrepos: Iterable[str],
|
||||
mreposdir: str,
|
||||
nmrepos: Iterable[str],
|
||||
nrepos: Iterable[str],
|
||||
nmrepos: list[str],
|
||||
nrepos: list[str],
|
||||
removes: Iterable[str],
|
||||
) -> str:
|
||||
"""Does the repository metadata modification (the clever bit!) and
|
||||
returns the modified repoclosure text. Non-modified base repos,
|
||||
modified base repos after modification, and the new repo are
|
||||
available to the solver; only modified base repos are checked.
|
||||
) -> tuple[str, str]:
|
||||
"""Runs repoclosure with new repo included and excludepkgs used
|
||||
for modified base repos, and returns the modified repoclosure
|
||||
text. Non-modified base repos, modified base repos after
|
||||
modification, and the new repo are available to the solver; only
|
||||
modified base repos are checked.
|
||||
"""
|
||||
args = DNFARGS + ["repoclosure"]
|
||||
queryargs = DNFARGS + ["repoquery", "--queryformat", "%{source_name},%{full_nevra}\n"]
|
||||
rcargs = DNFARGS + ["repoclosure"]
|
||||
for mrepo in mrepos:
|
||||
mrepodir = f"{mreposdir}/{hash_repo(mrepo)}"
|
||||
os.makedirs(f"{mrepodir}/repodata")
|
||||
repomdfn = f"{mrepodir}/repodata/repomd.xml"
|
||||
get_file(f"{mrepo}/repodata/repomd.xml", repomdfn)
|
||||
repomdtree = et.parse(repomdfn, parser=SAFEPARSER)
|
||||
repomdroot = repomdtree.getroot()
|
||||
# we need to also download and modify filelists, for file
|
||||
# dependencies that aren't included in primary
|
||||
filelists = mfind(repomdroot, "repo:data[@type='filelists']", XMLNS)
|
||||
flfn = download_element(filelists, mrepo, mrepodir)
|
||||
flexts = os.path.basename(flfn).split(".")[1:]
|
||||
primary = mfind(repomdroot, "repo:data[@type='primary']", XMLNS)
|
||||
primfn = download_element(primary, mrepo, mrepodir)
|
||||
primexts = os.path.basename(primfn).split(".")[1:]
|
||||
# https://github.com/pylint-dev/pylint/issues/5671#issuecomment-4239834783
|
||||
primdata, fldata = replace_packages(primfn, flfn, removes) # pylint:disable=W0632
|
||||
# we also need the module metadata if present, or else
|
||||
# module packages will be treated as non-module and cause
|
||||
# false results
|
||||
modules = repomdroot.find("repo:data[@type='modules']", XMLNS)
|
||||
if modules is not None:
|
||||
# setting up a test repo with module metadata is a huge pain
|
||||
download_element(modules, mrepo, mrepodir) # pragma: no cover
|
||||
# modify the repomd
|
||||
for typ, data in ((primary, primdata), (filelists, fldata)):
|
||||
csum, size, opensum, opensize = data
|
||||
mfind(typ, "repo:checksum", XMLNS).text = csum
|
||||
mfind(typ, "repo:size", XMLNS).text = str(size)
|
||||
mfind(typ, "repo:open-checksum", XMLNS).text = opensum
|
||||
mfind(typ, "repo:open-size", XMLNS).text = str(opensize)
|
||||
if typ == primary:
|
||||
mfind(primary, "repo:location", XMLNS).attrib["href"] = ".".join(
|
||||
[f"repodata/{csum}-primary"] + primexts
|
||||
)
|
||||
else:
|
||||
mfind(filelists, "repo:location", XMLNS).attrib["href"] = ".".join(
|
||||
[f"repodata/{csum}-filelists"] + flexts
|
||||
)
|
||||
# figure out what to exclude
|
||||
excludes = ""
|
||||
args = queryargs + ["--repofrompath", f"{hash_repo(mrepo)},{mrepo}"]
|
||||
out = SUBPCAPTURE(args).stdout
|
||||
for line in out.splitlines():
|
||||
# sname, nevr
|
||||
elems = line.split(",")
|
||||
if len(elems) != 2:
|
||||
continue # pragma: no cover
|
||||
if elems[0] in removes:
|
||||
if not excludes:
|
||||
excludes = elems[1]
|
||||
else:
|
||||
excludes += f",{elems[1]}"
|
||||
# add each modified base repo with excludepkgs set to the
|
||||
# list we discovered above, in the repoclosure args
|
||||
rcargs.extend(["--repofrompath", f"{hash_repo(mrepo)},{mrepo}"])
|
||||
rcargs.extend(["--setopt", f"{hash_repo(mrepo)}.excludepkgs={excludes}"])
|
||||
|
||||
alldata = repomdroot.findall("repo:data[@type]", XMLNS)
|
||||
notused = [data for data in alldata if data not in [primary, filelists, modules]]
|
||||
for item in notused:
|
||||
repomdroot.remove(item)
|
||||
repomdtree.write(repomdfn)
|
||||
# add the modified repo to the repoclosure command
|
||||
args.extend(["--repofrompath", f"{hash_repo(mrepo)},{mrepodir}"])
|
||||
|
||||
# now add the non-modified base repos
|
||||
for nmrepo in nmrepos:
|
||||
args.extend(["--repofrompath", f"{hash_repo(nmrepo)},{nmrepo}"])
|
||||
|
||||
# now add the new package repos
|
||||
for nrepo in nrepos:
|
||||
args.extend(["--repofrompath", f"{hash_repo(nrepo)},{nrepo}"])
|
||||
# now add the non-modified base repos and new package repos
|
||||
for repo in nmrepos + nrepos:
|
||||
rcargs.extend(["--repofrompath", f"{hash_repo(repo)},{repo}"])
|
||||
|
||||
# finally, add the check arg
|
||||
args.append("--check")
|
||||
args.append(",".join([hash_repo(mrepo) for mrepo in mrepos]))
|
||||
rcargs.append("--check")
|
||||
rcargs.append(",".join([hash_repo(mrepo) for mrepo in mrepos]))
|
||||
|
||||
ret = SUBPCAPTURE(args).stdout
|
||||
return ret
|
||||
|
||||
|
||||
def get_new_repoclosure(
|
||||
mrepos: Iterable[str], mreposdir: str, nmrepos: Iterable[str], nrepo: str
|
||||
) -> str:
|
||||
"""Gets and returns repoclosure text for the new repository; this
|
||||
is effectively an installability check. All base repos are
|
||||
available to the solver but are not checked. Note this is run
|
||||
*after* repo modification, so the check runs against the modified
|
||||
versions of the modifiable base repositories.
|
||||
"""
|
||||
cmdargs = DNFARGS + ["repoclosure"]
|
||||
for mrepo in mrepos:
|
||||
mrepodir = f"{mreposdir}/{hash_repo(mrepo)}"
|
||||
cmdargs.extend(["--repofrompath", f"{hash_repo(mrepo)},{mrepodir}"])
|
||||
for repo in nmrepos:
|
||||
cmdargs.extend(["--repofrompath", f"{hash_repo(repo)},{repo}"])
|
||||
cmdargs.extend(["--repofrompath", f"{hash_repo(nrepo)},{nrepo}", "--check", hash_repo(nrepo)])
|
||||
return SUBPCAPTURE(cmdargs).stdout
|
||||
# get the modified repoclosure
|
||||
mod = SUBPCAPTURE(rcargs).stdout
|
||||
new = ""
|
||||
if nrepos:
|
||||
# the first nrepo is the checked repo, only check that
|
||||
rcargs[-1] = hash_repo(nrepos[0])
|
||||
# get the new repoclosure
|
||||
new = SUBPCAPTURE(rcargs).stdout
|
||||
return (mod, new)
|
||||
|
||||
|
||||
def get_source_packages(repos: Iterable[str]) -> set[str]:
|
||||
|
|
@ -648,7 +337,7 @@ def parse_args() -> argparse.Namespace:
|
|||
def check_utils() -> None:
|
||||
"""Check required utilities are installed."""
|
||||
missing = []
|
||||
for prog in (("dnf", "--version"), ("curl", "-V")):
|
||||
for prog in (("dnf", "--version"),):
|
||||
try:
|
||||
subprocess.run(prog, stdout=subprocess.DEVNULL, check=True)
|
||||
except FileNotFoundError:
|
||||
|
|
@ -678,24 +367,17 @@ def main() -> None:
|
|||
|
||||
baserc = parse_repoclosure(get_base_repoclosure(args.baserepos, args.nmbaserepos))
|
||||
|
||||
# place to stash the modified repos
|
||||
with tempfile.TemporaryDirectory() as mreposdir:
|
||||
# get the modified rpmclosure output
|
||||
modrc = parse_repoclosure(
|
||||
get_modified_repoclosure(
|
||||
args.baserepos, mreposdir, args.nmbaserepos, nrepos, sources
|
||||
)
|
||||
)
|
||||
# get the modified rpmclosure output
|
||||
modraw, newraw = get_modified_and_new_repoclosure(
|
||||
args.baserepos, args.nmbaserepos, nrepos, sources
|
||||
)
|
||||
modrc = parse_repoclosure(modraw)
|
||||
newrc = []
|
||||
if newraw:
|
||||
newrc = parse_repoclosure(newraw)
|
||||
|
||||
# figure out the diffs
|
||||
newerrors = [dep for dep in modrc if dep not in baserc]
|
||||
fixederrors = [dep for dep in baserc if dep not in modrc]
|
||||
newrc = []
|
||||
if args.repo:
|
||||
# get repoclosure on new repo - this is an installability test
|
||||
newrc = parse_repoclosure(
|
||||
get_new_repoclosure(args.baserepos, mreposdir, args.nmbaserepos, args.repo)
|
||||
)
|
||||
newerrors = [dep for dep in modrc if dep not in baserc]
|
||||
fixederrors = [dep for dep in baserc if dep not in modrc]
|
||||
preexisting = handle_preexisting(fixederrors, newrc)
|
||||
|
||||
# output
|
||||
|
|
|
|||
|
|
@ -22,22 +22,10 @@
|
|||
|
||||
"""Tests for rmdepcheck."""
|
||||
|
||||
import glob
|
||||
import gzip
|
||||
import io
|
||||
import lzma
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import xml.etree.ElementTree as et
|
||||
from unittest import mock
|
||||
|
||||
if sys.version_info >= (3, 14):
|
||||
from compression import zstd
|
||||
else:
|
||||
from backports import zstd
|
||||
|
||||
import pytest
|
||||
import rmdepcheck
|
||||
|
||||
|
|
@ -46,15 +34,6 @@ TESTDATA = f"{HERE}/testdata"
|
|||
REPOS = f"{TESTDATA}/repos"
|
||||
|
||||
|
||||
def test_mfind():
|
||||
repomdtree = et.parse(f"{REPOS}/base/repodata/repomd.xml")
|
||||
# test_get_primary and various others test the success path, so
|
||||
# we'll just test failure, which shouldn't ever really happen
|
||||
assert repomdtree.find("foobar", {}) is None
|
||||
with pytest.raises(ValueError):
|
||||
rmdepcheck.mfind(repomdtree, "foobar", {})
|
||||
|
||||
|
||||
def test_parse_repoclosure():
|
||||
with open(f"{TESTDATA}/test_parse_repoclosure.txt", "r", encoding="utf-8") as fh:
|
||||
rctext = fh.read()
|
||||
|
|
@ -80,147 +59,6 @@ def test_format_rc_errors(capsys):
|
|||
assert captured.out == exptext
|
||||
|
||||
|
||||
def test_get_file():
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
testfile = f"file://{HERE}/test_rmdepcheck.py"
|
||||
rmdepcheck.get_file(testfile, f"{tempdir}/test_rmdepcheck.py")
|
||||
with open(f"{tempdir}/test_rmdepcheck.py", "r", encoding="utf-8") as testfh:
|
||||
assert "This file is" in testfh.read()
|
||||
|
||||
|
||||
def test_download_element():
|
||||
repomdtree = et.parse(f"{REPOS}/base/repodata/repomd.xml")
|
||||
repomdroot = repomdtree.getroot()
|
||||
primary = repomdroot.find("repo:data[@type='primary']", rmdepcheck.XMLNS)
|
||||
assert isinstance(primary, et.Element)
|
||||
assert primary.attrib == {"type": "primary"}
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
os.makedirs(f"{tempdir}/repodata")
|
||||
rmdepcheck.download_element(primary, f"file://{REPOS}/base", tempdir)
|
||||
# NOTE: this filename changes any time mkrepos.py is run
|
||||
assert os.path.exists(
|
||||
# pylint: disable-next=line-too-long
|
||||
f"{tempdir}/repodata/54942fbb3415a66f4ca49463d36439407c816b3eed8e30fc43016bd70ac9d456-primary.xml.zst"
|
||||
)
|
||||
|
||||
|
||||
def test_parse_xml_empty():
|
||||
"""Test parse_xml can handle an empty repo. We mostly test it
|
||||
implicitly, but easiest to test this explicitly.
|
||||
"""
|
||||
empty = b"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
|
||||
</metadata>
|
||||
"""
|
||||
infh = io.BytesIO(empty)
|
||||
outfh = io.BytesIO()
|
||||
assert rmdepcheck.parse_xml(infh, outfh, ["somepackage"], False) == (
|
||||
set(),
|
||||
"0f9fd176ae380f60833f432e009774b256c2e77fc50740be60eaaf06d49ee004",
|
||||
168,
|
||||
)
|
||||
outfh.seek(0)
|
||||
assert outfh.read() == empty
|
||||
|
||||
|
||||
@pytest.mark.parametrize("extension", (".xml", ".xml.zst", ".xml.gz", ".xml.xz"))
|
||||
@pytest.mark.parametrize(
|
||||
"repotup",
|
||||
(
|
||||
(
|
||||
"binary",
|
||||
(
|
||||
(
|
||||
"8cba35dd9233f535f90d2c1a6f7c00a8ae8bfcc9b0854f50ef593ab8f685cd7c",
|
||||
9596,
|
||||
),
|
||||
(
|
||||
"03254e576554e240d9d2434ac9e6f06e71d50adf5e5699acdecd28d0ceafaa37",
|
||||
1562,
|
||||
),
|
||||
),
|
||||
),
|
||||
(
|
||||
"source",
|
||||
(
|
||||
(
|
||||
"c55ba80f7615e041c9d84b8237a6dd4afb9e56c9d7e1776324a164d4e0b921a1",
|
||||
1082,
|
||||
),
|
||||
(
|
||||
"97c9e867dca2f9f1d0c1f45e77c5ffb9e6b91e5a76b73240bc162aa4d8b8bd9b",
|
||||
282,
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
def test_replace_packages(extension, repotup):
|
||||
# this exercises some rarely-encountered edge case parser paths
|
||||
if extension == ".xml":
|
||||
rmdepcheck.CHUNKSIZE = 1
|
||||
repo, expected = repotup
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
shutil.copy2(
|
||||
# binary.xml is an old version of base's primary file
|
||||
# source.xml is a primary file from a repo with just
|
||||
# ccc.src and ddd.src packages
|
||||
f"{TESTDATA}/test_replace_primary_{repo}{extension}",
|
||||
f"{tempdir}/testprim{extension}",
|
||||
)
|
||||
shutil.copy2(
|
||||
f"{TESTDATA}/test_replace_fl_{repo}{extension}",
|
||||
f"{tempdir}/testfl{extension}",
|
||||
)
|
||||
ret = rmdepcheck.replace_packages(
|
||||
f"{tempdir}/testprim{extension}", f"{tempdir}/testfl{extension}", "ccc"
|
||||
)
|
||||
# skip the compressed sum and size as they may change with
|
||||
# python version
|
||||
ret = ((ret[0][2], ret[0][3]), (ret[1][2], ret[1][3]))
|
||||
assert ret == expected
|
||||
gotprim = glob.glob(f"{tempdir}/*primary.xml*")[0]
|
||||
gotfl = glob.glob(f"{tempdir}/*filelists.xml*")[0]
|
||||
funcmap = {
|
||||
".xml": open,
|
||||
".xml.zst": zstd.open,
|
||||
".xml.gz": gzip.open,
|
||||
".xml.xz": lzma.open,
|
||||
}
|
||||
with funcmap[extension](gotprim, "rb") as gotpfh:
|
||||
with open(f"{TESTDATA}/test_replace_primary_{repo}_expected.xml", "rb") as exppfh:
|
||||
assert gotpfh.read() == exppfh.read()
|
||||
with funcmap[extension](gotfl, "rb") as gotffh:
|
||||
with open(f"{TESTDATA}/test_replace_fl_{repo}_expected.xml", "rb") as expffh:
|
||||
assert gotffh.read() == expffh.read()
|
||||
|
||||
|
||||
def test_replace_packages_minified():
|
||||
rmdepcheck.CHUNKSIZE = 1
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
shutil.copy2(
|
||||
# same as source but with all newlines removed
|
||||
f"{TESTDATA}/test_replace_primary_source_minified.xml",
|
||||
f"{tempdir}/testprim.xml",
|
||||
)
|
||||
shutil.copy2(
|
||||
# same as source but with all newlines removed
|
||||
f"{TESTDATA}/test_replace_fl_source_minified.xml",
|
||||
f"{tempdir}/testfl.xml",
|
||||
)
|
||||
rmdepcheck.replace_packages(f"{tempdir}/testprim.xml", f"{tempdir}/testfl.xml", "ccc")
|
||||
gotprim = glob.glob(f"{tempdir}/*primary.xml*")[0]
|
||||
gotfl = glob.glob(f"{tempdir}/*filelists.xml*")[0]
|
||||
with open(gotprim, "rb") as gotpfh:
|
||||
with open(
|
||||
f"{TESTDATA}/test_replace_primary_source_minified_expected.xml", "rb"
|
||||
) as exppfh:
|
||||
assert gotpfh.read() == exppfh.read()
|
||||
with open(gotfl, "rb") as gotffh:
|
||||
with open(f"{TESTDATA}/test_replace_fl_source_minified_expected.xml", "rb") as expffh:
|
||||
assert gotffh.read() == expffh.read()
|
||||
|
||||
|
||||
def test_get_base_repoclosure():
|
||||
repo = f"file://{REPOS}/base"
|
||||
with open(f"{TESTDATA}/test_get_base_repoclosure.txt", "r", encoding="utf-8") as testfh:
|
||||
|
|
@ -230,31 +68,19 @@ def test_get_base_repoclosure():
|
|||
assert ret == expected
|
||||
|
||||
|
||||
def test_get_modified_repoclosure():
|
||||
brepo = f"file://{REPOS}/base"
|
||||
with open(f"{TESTDATA}/test_get_modified_repoclosure.txt", "r", encoding="utf-8") as testfh:
|
||||
expected = testfh.read()
|
||||
expected = expected.replace("{HASH}", rmdepcheck.hash_repo(brepo))
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
ret = rmdepcheck.get_modified_repoclosure(
|
||||
[brepo], tempdir, [], [f"file://{REPOS}/new"], ["aaa", "ccc", "eee", "fff", "ggg"]
|
||||
)
|
||||
assert ret == expected
|
||||
|
||||
|
||||
def test_get_new_repoclosure():
|
||||
def test_get_modified_and_new_repoclosure():
|
||||
brepo = f"file://{REPOS}/base"
|
||||
nrepo = f"file://{REPOS}/new"
|
||||
with open(f"{TESTDATA}/test_get_modified_repoclosure.txt", "r", encoding="utf-8") as testfh:
|
||||
expectedmod = testfh.read()
|
||||
expectedmod = expectedmod.replace("{HASH}", rmdepcheck.hash_repo(brepo))
|
||||
with open(f"{TESTDATA}/test_get_new_repoclosure.txt", "r", encoding="utf-8") as testfh:
|
||||
expected = testfh.read()
|
||||
expected = expected.replace("{HASH}", rmdepcheck.hash_repo(nrepo))
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
# populate the modified repo dir for accuracy
|
||||
rmdepcheck.get_modified_repoclosure(
|
||||
[brepo], tempdir, [], [f"file://{REPOS}/new"], ["aaa", "ccc", "eee", "fff", "ggg"]
|
||||
)
|
||||
ret = rmdepcheck.get_new_repoclosure([brepo], tempdir, [], nrepo)
|
||||
assert ret == expected
|
||||
expectednew = testfh.read()
|
||||
expectednew = expectednew.replace("{HASH}", rmdepcheck.hash_repo(nrepo))
|
||||
ret = rmdepcheck.get_modified_and_new_repoclosure(
|
||||
[brepo], [], [f"file://{REPOS}/new"], ["aaa", "ccc", "eee", "fff", "ggg"]
|
||||
)
|
||||
assert ret == (expectedmod, expectednew)
|
||||
|
||||
|
||||
def test_get_source_packages():
|
||||
|
|
@ -341,10 +167,10 @@ def test_check_arch(_):
|
|||
@mock.patch("subprocess.run", autospec=True)
|
||||
def test_check_utils(mock_run):
|
||||
rmdepcheck.check_utils()
|
||||
mock_run.side_effect = [None, FileNotFoundError]
|
||||
mock_run.side_effect = FileNotFoundError
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
rmdepcheck.check_utils()
|
||||
assert excinfo.value.code == "Please install missing required utilities: curl"
|
||||
assert excinfo.value.code == "Please install missing required utilities: dnf"
|
||||
|
||||
|
||||
@mock.patch("rmdepcheck.check_utils", side_effect=KeyboardInterrupt)
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
black
|
||||
coverage
|
||||
diff-cover
|
||||
lxml
|
||||
mypy
|
||||
pylint
|
||||
pytest-cov
|
||||
types-lxml
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue