kiwix-build/kiwixbuild/utils.py

385 lines
12 KiB
Python

import hashlib
import tarfile, zipfile
import tempfile
import shutil
import os, stat, sys
import urllib.request
import urllib.error
import ssl
import subprocess
import re
from pathlib import Path, PurePosixPath
from collections import namedtuple, defaultdict
from kiwixbuild._global import neutralEnv, option
COLORS = {
"OK": "\033[92m",
"WARNING": "\033[93m",
"NEEDED": "\033[93m",
"SKIP": "\033[34m",
"ERROR": "\033[91m",
"": "\033[0m",
}
REMOTE_PREFIX = "http://mirror.download.kiwix.org/dev/kiwix-build/"
def which(name):
command = f"which {name}"
output = subprocess.check_output(command, shell=True)
return output[:-1].decode()
def xrun_find(name):
command = f"xcrun -find {name}"
output = subprocess.check_output(command, shell=True)
return output[:-1].decode()
regex_space = re.compile(r"((?<!\\) )")
def escape_path(path: Path):
path = str(path)
return regex_space.sub(r"\ ", path)
class Defaultdict(defaultdict):
def __getattr__(self, name):
return self[name]
class DefaultEnv(Defaultdict):
def __init__(self):
super().__init__(str, os.environ)
def __getitem__(self, name):
if name == b"PATH":
raise KeyError
if name in ["PATH", "PKG_CONFIG_PATH", "LD_LIBRARY_PATH"]:
item = super().__getitem__(name)
if isinstance(item, PathArray):
return item
else:
item = PathArray(item)
self[name] = item
return item
return super().__getitem__(name)
def get_separator():
return ";" if neutralEnv("distname") == "Windows" else ":"
class PathArray(list):
def __init__(self, value):
self.separator = get_separator()
if not value:
super().__init__([])
else:
super().__init__(value.split(self.separator))
def __str__(self):
return self.separator.join((str(v) for v in self))
def remove_duplicates(iterable, key_function=None):
seen = set()
if key_function is None:
key_function = lambda e: e
for elem in iterable:
key = key_function(elem)
if key in seen:
continue
seen.add(key)
yield elem
def get_sha256(path: Path):
progress_chars = "/-\\|"
current = 0
batch_size = 1024 * 8
sha256 = hashlib.sha256()
with path.open("br") as f:
while True:
batch = f.read(batch_size)
if not batch:
break
sha256.update(batch)
print_progress(progress_chars[current])
current = (current + 1) % 4
return sha256.hexdigest()
def colorize(text, color=None):
if color is None:
color = text
return f"{COLORS[color]}{text}{COLORS['']}"
def print_progress(progress):
if option("show_progress"):
text = f"{progress}\033[{len(progress)}D"
print(text, end="")
def add_execution_right(file_path: Path):
current_permissions = stat.S_IMODE(os.lstat(file_path).st_mode)
file_path.chmod(current_permissions | stat.S_IXUSR)
def copy_tree(src: Path, dst: Path, post_copy_function=None):
dst.mkdir(parents=True, exist_ok=True)
for root, dirs, files in src.walk():
r = root.relative_to(src)
dstdir = dst / r
dstdir.mkdir(exist_ok=True)
for f in files:
dstfile = dstdir / f
shutil.copy2(root / f, dstfile, follow_symlinks=False)
if post_copy_function is not None:
post_copy_function(dstfile)
class Remotefile(namedtuple("Remotefile", ("name", "sha256", "url"))):
def __new__(cls, name, sha256, url=None):
if url is None:
url = REMOTE_PREFIX + name
return super().__new__(cls, name, sha256, url)
def download_remote(what: Remotefile, where: Path):
file_path = where / what.name
if file_path.exists():
if what.sha256 == get_sha256(file_path):
raise SkipCommand()
file_path.unlink()
if option("no_cert_check"):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
context = None
batch_size = 1024 * 8
extra_args = {"context": context} if sys.version_info >= (3, 4, 3) else {}
progress_chars = "/-\\|"
try:
with urllib.request.urlopen(what.url, **extra_args) as resource, file_path.open(
"wb"
) as file:
tsize = resource.info().get("Content-Length", None)
if tsize is not None:
tsize = int(tsize)
current = 0
while True:
batch = resource.read(batch_size)
if not batch:
break
if tsize:
current += batch_size
print_progress(f"{current/tsize:.2%}")
else:
print_progress(progress_chars[current])
current = (current + 1) % 4
file.write(batch)
except urllib.error.URLError as e:
print(f"Cannot download url {what.url}:\n{e.reason}")
raise StopBuild()
if not what.sha256:
print(f"Sha256 for {what.name} not set, do no verify download")
elif what.sha256 != get_sha256(file_path):
file_path.unlink()
raise StopBuild("Sha 256 doesn't correspond")
class BaseCommandResult(Exception):
def __init__(self, msg=""):
self.msg = msg
def __str__(self):
return self.msg
class SkipCommand(BaseCommandResult):
def __str__(self):
if self.msg:
return colorize("SKIP") + f" : {self.msg}"
return colorize("SKIP")
class WarningMessage(BaseCommandResult):
def __str__(self):
return colorize("WARNING") + f" : {self.msg}"
class StopBuild(BaseCommandResult):
pass
class Context:
def __init__(self, command_name, log_file, force_native_build):
self.command_name = command_name
self.log_file = log_file
self.force_native_build = force_native_build
self.autoskip_file = None
self.no_skip = False
def skip(self, msg=""):
raise SkipCommand(msg)
def try_skip(self, path: Path, extra_name=""):
if self.no_skip:
return
if extra_name:
extra_name = f"_{extra_name}"
self.autoskip_file = path / f".{self.command_name}{extra_name}_ok"
if self.autoskip_file.exists():
raise SkipCommand()
def _finalise(self):
if self.autoskip_file is not None:
self.autoskip_file.parent.mkdir(parents=True, exist_ok=True)
self.autoskip_file.touch()
def extract_archive(archive_path: Path, dest_dir: Path, topdir=None, name=None):
"""Extract an archive to dest_dir.
This archive can be a zip or a tar archive.
If topdir is given, only this directory (and sub files/dirs) will be extracted.
If name is given, the topdir will be extracted under this name.
"""
is_zip_archive = archive_path.suffix == ".zip"
archive = None
try:
if is_zip_archive:
archive = zipfile.ZipFile(archive_path)
members = archive.infolist()
getname = lambda info: info.filename
isdir = lambda info: info.filename.endswith("/")
else:
archive = tarfile.open(archive_path)
members = archive.getmembers()
getname = lambda info: getattr(info, "name")
isdir = lambda info: info.isdir()
if not topdir:
for d in (m for m in members if isdir(m)):
_name = getname(d)
if _name.endswith("/"):
_name = _name[:-1]
if not os.path.dirname(_name):
if topdir:
# There is already a top dir.
# Two topdirs in the same archive.
# Extract all
topdir = None
break
topdir = _name
if topdir:
members_to_extract = [
m for m in members if getname(m).startswith(topdir + "/")
]
dest_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(
prefix=archive_path.name, dir=dest_dir
) as tmpdir:
tmpdir_path = Path(tmpdir)
if is_zip_archive:
_members_to_extract = [getname(m) for m in members_to_extract]
else:
_members_to_extract = members_to_extract
archive.extractall(path=tmpdir_path, members=_members_to_extract)
if is_zip_archive:
for member in members_to_extract:
if isdir(member):
continue
perm = (member.external_attr >> 16) & 0x1FF
if perm:
(tmpdir_path / getname(member)).chmod(perm)
name = name or topdir
shutil.copytree(
tmpdir_path / topdir,
dest_dir / name,
symlinks=True,
dirs_exist_ok=True,
)
# Be sure that all directory in tmpdir are writable to allow correct suppersion of it
for root, dirs, _files in tmpdir_path.walk():
for d in dirs:
(root / d).chmod(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
else:
if name:
dest_dir = dest_dir / name
dest_dir.mkdir(parents=True)
archive.extractall(path=dest_dir)
finally:
if archive is not None:
archive.close()
def win_to_posix_path(p: Path or str) -> str:
if isinstance(p, Path):
return str(PurePosixPath(p)).replace("C:/", "/c/")
return p
def run_command(command, cwd, context, *, env=None, input=None, force_posix_path=False):
os.makedirs(cwd, exist_ok=True)
if env is None:
env = DefaultEnv()
log = None
if force_posix_path:
transform_path = win_to_posix_path
else:
transform_path = lambda v: v
command = [str(transform_path(v)) for v in command]
try:
if not option("verbose"):
log = open(context.log_file, "w")
print(f"run command '{command}'", file=log)
print(f"current directory is '{cwd}'", file=log)
print("env is :", file=log)
env = {k: str(transform_path(v)) for k, v in env.items()}
for k, v in env.items():
print(f" {k} : {v!r}", file=log)
if log:
log.flush()
kwargs = dict()
if input:
kwargs["stdin"] = subprocess.PIPE
process = subprocess.Popen(
command,
cwd=cwd,
env=env,
stdout=log or sys.stdout,
stderr=subprocess.STDOUT,
**kwargs,
)
if input:
input = input.encode()
while True:
try:
if input is None:
process.wait(timeout=30)
else:
process.communicate(input, timeout=30)
except subprocess.TimeoutExpired:
# Either `wait` timeout (and `input` is None) or
# `communicate` timeout (and we must set `input` to None
# to not communicate again).
input = None
print(".", end="", flush=True)
else:
break
if process.returncode:
raise subprocess.CalledProcessError(process.returncode, command)
finally:
if log:
log.close()