import os.path import hashlib import tarfile, zipfile import tempfile import shutil import os, stat, sys import urllib.request import urllib.error import ssl import subprocess import re from collections import namedtuple, defaultdict from kiwixbuild._global import neutralEnv, option def pj(*args): return os.path.normpath(os.path.join(*args)) COLORS = { "OK": "\033[92m", "WARNING": "\033[93m", "NEEDED": "\033[93m", "SKIP": "\033[34m", "ERROR": "\033[91m", "": "\033[0m", } REMOTE_PREFIX = "http://mirror.download.kiwix.org/dev/kiwix-build/" def which(name): command = "which {}".format(name) output = subprocess.check_output(command, shell=True) return output[:-1].decode() def xrun_find(name): command = "xcrun -find {}".format(name) output = subprocess.check_output(command, shell=True) return output[:-1].decode() regex_space = re.compile(r"((?= (3, 4, 3) else {} progress_chars = "/-\\|" try: with urllib.request.urlopen(what.url, **extra_args) as resource, open( file_path, "wb" ) as file: tsize = resource.info().get("Content-Length", None) if tsize is not None: tsize = int(tsize) current = 0 while True: batch = resource.read(batch_size) if not batch: break if tsize: current += batch_size print_progress("{:.2%}".format(current / tsize)) else: print_progress(progress_chars[current]) current = (current + 1) % 4 file.write(batch) except urllib.error.URLError as e: print("Cannot download url {}:\n{}".format(what.url, e.reason)) raise StopBuild() if not what.sha256: print("Sha256 for {} not set, do no verify download".format(what.name)) elif what.sha256 != get_sha256(file_path): os.remove(file_path) raise StopBuild("Sha 256 doesn't correspond") class BaseCommandResult(Exception): def __init__(self, msg=""): self.msg = msg def __str__(self): return self.msg class SkipCommand(BaseCommandResult): def __str__(self): if self.msg: return colorize("SKIP") + " : {}".format(self.msg) return colorize("SKIP") class WarningMessage(BaseCommandResult): def __str__(self): return colorize("WARNING") + " : {}".format(self.msg) class StopBuild(BaseCommandResult): pass class Remotefile(namedtuple("Remotefile", ("name", "sha256", "url"))): def __new__(cls, name, sha256, url=None): if url is None: url = REMOTE_PREFIX + name return super().__new__(cls, name, sha256, url) class Context: def __init__(self, command_name, log_file, force_native_build): self.command_name = command_name self.log_file = log_file self.force_native_build = force_native_build self.autoskip_file = None self.no_skip = False def skip(self, msg=""): raise SkipCommand(msg) def try_skip(self, path, extra_name=""): if self.no_skip: return if extra_name: extra_name = "_{}".format(extra_name) self.autoskip_file = pj(path, ".{}{}_ok".format(self.command_name, extra_name)) if os.path.exists(self.autoskip_file): raise SkipCommand() def _finalise(self): if self.autoskip_file is not None: os.makedirs(os.path.dirname(self.autoskip_file), exist_ok=True) with open(self.autoskip_file, "w"): pass def extract_archive(archive_path, dest_dir, topdir=None, name=None): is_zip_archive = archive_path.endswith(".zip") archive = None try: if is_zip_archive: archive = zipfile.ZipFile(archive_path) members = archive.infolist() getname = lambda info: info.filename isdir = lambda info: info.filename.endswith("/") else: archive = tarfile.open(archive_path) members = archive.getmembers() getname = lambda info: getattr(info, "name") isdir = lambda info: info.isdir() if not topdir: for d in (m for m in members if isdir(m)): _name = getname(d) if _name.endswith("/"): _name = _name[:-1] if not os.path.dirname(_name): if topdir: # There is already a top dir. # Two topdirs in the same archive. # Extract all topdir = None break topdir = _name if topdir: members_to_extract = [ m for m in members if getname(m).startswith(topdir + "/") ] os.makedirs(dest_dir, exist_ok=True) with tempfile.TemporaryDirectory( prefix=os.path.basename(archive_path), dir=dest_dir ) as tmpdir: if is_zip_archive: _members_to_extract = [getname(m) for m in members_to_extract] else: _members_to_extract = members_to_extract archive.extractall(path=tmpdir, members=_members_to_extract) if is_zip_archive: for member in members_to_extract: if isdir(member): continue perm = (member.external_attr >> 16) & 0x1FF os.chmod(pj(tmpdir, getname(member)), perm) name = name or topdir os.rename(pj(tmpdir, topdir), pj(dest_dir, name)) else: if name: dest_dir = pj(dest_dir, name) os.makedirs(dest_dir) archive.extractall(path=dest_dir) finally: if archive is not None: archive.close() def run_command(command, cwd, context, *, env=None, input=None): os.makedirs(cwd, exist_ok=True) if env is None: env = DefaultEnv() log = None try: if not option("verbose"): log = open(context.log_file, "w") print("run command '{}'".format(command), file=log) print("current directory is '{}'".format(cwd), file=log) print("env is :", file=log) for k, v in env.items(): print(" {} : {!r}".format(k, v), file=log) if log: log.flush() kwargs = dict() if input: kwargs["stdin"] = subprocess.PIPE process = subprocess.Popen( command, cwd=cwd, env=env, stdout=log or sys.stdout, stderr=subprocess.STDOUT, **kwargs ) if input: input = input.encode() while True: try: if input is None: process.wait(timeout=30) else: process.communicate(input, timeout=30) except subprocess.TimeoutExpired: # Either `wait` timeout (and `input` is None) or # `communicate` timeout (and we must set `input` to None # to not communicate again). input = None print(".", end="", flush=True) else: break if process.returncode: raise subprocess.CalledProcessError(process.returncode, command) finally: if log: log.close()