import os.path import hashlib import tarfile, zipfile import tempfile import shutil import os, stat, sys import urllib.request import urllib.error import ssl import subprocess import re from collections import namedtuple, defaultdict from kiwixbuild._global import neutralEnv, option pj = os.path.join REMOTE_PREFIX = 'http://download.kiwix.org/dev/' def which(name): command = "which {}".format(name) output = subprocess.check_output(command, shell=True) return output[:-1].decode() def xrun_find(name): command = "xcrun -find {}".format(name) output = subprocess.check_output(command, shell=True) return output[:-1].decode() regex_space = re.compile(r'((?= (3, 4, 3) else {} progress_chars = "/-\|" try: with urllib.request.urlopen(file_url, **extra_args) as resource, open(file_path, 'wb') as file: tsize = resource.info().get('Content-Length', None) if tsize is not None: tsize = int(tsize) current = 0 while True: batch = resource.read(batch_size) if not batch: break if tsize: current += batch_size print_progress("{:.2%}".format(current/tsize)) else: print_progress(progress_chars[current]) current = (current+1)%4 file.write(batch) except urllib.error.URLError as e: print("Cannot download url {}:\n{}".format(file_url, e.reason)) raise StopBuild() if not what.sha256: print('Sha256 for {} not set, do no verify download'.format(what.name)) elif what.sha256 != get_sha256(file_path): os.remove(file_path) raise StopBuild() class SkipCommand(Exception): pass class StopBuild(Exception): pass class Remotefile(namedtuple('Remotefile', ('name', 'sha256', 'url'))): def __new__(cls, name, sha256, url=None): return super().__new__(cls, name, sha256, url) class Context: def __init__(self, command_name, log_file, force_native_build): self.command_name = command_name self.log_file = log_file self.force_native_build = force_native_build self.autoskip_file = None def try_skip(self, path, extra_name=""): if extra_name: extra_name = "_{}".format(extra_name) self.autoskip_file = pj(path, ".{}{}_ok".format(self.command_name, extra_name)) if os.path.exists(self.autoskip_file): raise SkipCommand() def _finalise(self): if self.autoskip_file is not None: with open(self.autoskip_file, 'w'): pass def extract_archive(archive_path, dest_dir, topdir=None, name=None): is_zip_archive = archive_path.endswith('.zip') archive = None try: if is_zip_archive: archive = zipfile.ZipFile(archive_path) members = archive.infolist() getname = lambda info : info.filename isdir = lambda info: info.filename.endswith('/') else: archive = tarfile.open(archive_path) members = archive.getmembers() getname = lambda info : getattr(info, 'name') isdir = lambda info: info.isdir() if not topdir: for d in (m for m in members if isdir(m)): _name = getname(d) if _name.endswith('/'): _name = _name[:-1] if not os.path.dirname(_name): if topdir: # There is already a top dir. # Two topdirs in the same archive. # Extract all topdir = None break topdir = _name if topdir: members_to_extract = [m for m in members if getname(m).startswith(topdir+'/')] os.makedirs(dest_dir, exist_ok=True) with tempfile.TemporaryDirectory(prefix=os.path.basename(archive_path), dir=dest_dir) as tmpdir: if is_zip_archive: _members_to_extract = [getname(m) for m in members_to_extract] else: _members_to_extract = members_to_extract archive.extractall(path=tmpdir, members=_members_to_extract) if is_zip_archive: for member in members_to_extract: if isdir(member): continue perm = (member.external_attr >> 16) & 0x1FF os.chmod(pj(tmpdir, getname(member)), perm) name = name or topdir os.rename(pj(tmpdir, topdir), pj(dest_dir, name)) else: if name: dest_dir = pj(dest_dir, name) os.makedirs(dest_dir) archive.extractall(path=dest_dir) finally: if archive is not None: archive.close() def run_command(command, cwd, context, buildEnv=None, env=None, input=None, cross_env_only=False): os.makedirs(cwd, exist_ok=True) if env is None: env = DefaultEnv() if buildEnv is not None: cross_compile_env = True cross_compile_compiler = True cross_compile_path = True if context.force_native_build: cross_compile_env = False cross_compile_compiler = False cross_compile_path = False if cross_env_only: cross_compile_compiler = False env = buildEnv._set_env(env, cross_compile_env, cross_compile_compiler, cross_compile_path) log = None try: if not option('verbose'): log = open(context.log_file, 'w') print("run command '{}'".format(command), file=log) print("current directory is '{}'".format(cwd), file=log) print("env is :", file=log) for k, v in env.items(): print(" {} : {!r}".format(k, v), file=log) kwargs = dict() if input: kwargs['stdin'] = subprocess.PIPE process = subprocess.Popen(command, cwd=cwd, env=env, stdout=log or sys.stdout, stderr=subprocess.STDOUT, **kwargs) if input: process.communicate(input.encode()) retcode = process.wait() if retcode: raise subprocess.CalledProcessError(retcode, command) finally: if log: log.close()