Format our code with black

This commit is contained in:
Matthieu Gautier
2024-02-05 11:41:09 +01:00
parent 939f323709
commit 5a1175cf2d
51 changed files with 1916 additions and 1518 deletions

View File

@ -17,16 +17,16 @@ pj = os.path.join
COLORS = {
'OK': '\033[92m',
'WARNING': '\033[93m',
'NEEDED': '\033[93m',
'SKIP': '\033[34m',
'ERROR': '\033[91m',
'': '\033[0m',
"OK": "\033[92m",
"WARNING": "\033[93m",
"NEEDED": "\033[93m",
"SKIP": "\033[34m",
"ERROR": "\033[91m",
"": "\033[0m",
}
REMOTE_PREFIX = 'http://mirror.download.kiwix.org/dev/kiwix-build/'
REMOTE_PREFIX = "http://mirror.download.kiwix.org/dev/kiwix-build/"
def which(name):
@ -34,16 +34,19 @@ def which(name):
output = subprocess.check_output(command, shell=True)
return output[:-1].decode()
def xrun_find(name):
command = "xcrun -find {}".format(name)
output = subprocess.check_output(command, shell=True)
return output[:-1].decode()
regex_space = re.compile(r'((?<!\\) )')
regex_space = re.compile(r"((?<!\\) )")
def escape_path(path):
path = str(path)
return regex_space.sub(r'\ ', path)
return regex_space.sub(r"\ ", path)
class Defaultdict(defaultdict):
@ -56,7 +59,7 @@ class DefaultEnv(Defaultdict):
super().__init__(str, os.environ)
def __getitem__(self, name):
if name == b'PATH':
if name == b"PATH":
raise KeyError
return super().__getitem__(name)
@ -78,25 +81,25 @@ def get_sha256(path):
current = 0
batch_size = 1024 * 8
sha256 = hashlib.sha256()
with open(path, 'br') as f:
with open(path, "br") as f:
while True:
batch = f.read(batch_size)
if not batch:
break
sha256.update(batch)
print_progress(progress_chars[current])
current = (current+1)%4
current = (current + 1) % 4
return sha256.hexdigest()
def colorize(text, color=None):
if color is None:
color = text
return "{}{}{}".format(COLORS[color], text, COLORS[''])
return "{}{}{}".format(COLORS[color], text, COLORS[""])
def print_progress(progress):
if option('show_progress'):
if option("show_progress"):
text = "{}\033[{}D".format(progress, len(progress))
print(text, end="")
@ -126,18 +129,20 @@ def download_remote(what, where):
raise SkipCommand()
os.remove(file_path)
if option('no_cert_check'):
if option("no_cert_check"):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
context = None
batch_size = 1024 * 8
extra_args = {'context':context} if sys.version_info >= (3, 4, 3) else {}
extra_args = {"context": context} if sys.version_info >= (3, 4, 3) else {}
progress_chars = "/-\|"
try:
with urllib.request.urlopen(what.url, **extra_args) as resource, open(file_path, 'wb') as file:
tsize = resource.info().get('Content-Length', None)
with urllib.request.urlopen(what.url, **extra_args) as resource, open(
file_path, "wb"
) as file:
tsize = resource.info().get("Content-Length", None)
if tsize is not None:
tsize = int(tsize)
current = 0
@ -147,17 +152,17 @@ def download_remote(what, where):
break
if tsize:
current += batch_size
print_progress("{:.2%}".format(current/tsize))
print_progress("{:.2%}".format(current / tsize))
else:
print_progress(progress_chars[current])
current = (current+1)%4
print_progress(progress_chars[current])
current = (current + 1) % 4
file.write(batch)
except urllib.error.URLError as e:
print("Cannot download url {}:\n{}".format(what.url, e.reason))
raise StopBuild()
if not what.sha256:
print('Sha256 for {} not set, do no verify download'.format(what.name))
print("Sha256 for {} not set, do no verify download".format(what.name))
elif what.sha256 != get_sha256(file_path):
os.remove(file_path)
raise StopBuild("Sha 256 doesn't correspond")
@ -187,7 +192,7 @@ class StopBuild(BaseCommandResult):
pass
class Remotefile(namedtuple('Remotefile', ('name', 'sha256', 'url'))):
class Remotefile(namedtuple("Remotefile", ("name", "sha256", "url"))):
def __new__(cls, name, sha256, url=None):
if url is None:
url = REMOTE_PREFIX + name
@ -217,28 +222,28 @@ class Context:
def _finalise(self):
if self.autoskip_file is not None:
os.makedirs(os.path.dirname(self.autoskip_file), exist_ok=True)
with open(self.autoskip_file, 'w'):
with open(self.autoskip_file, "w"):
pass
def extract_archive(archive_path, dest_dir, topdir=None, name=None):
is_zip_archive = archive_path.endswith('.zip')
is_zip_archive = archive_path.endswith(".zip")
archive = None
try:
if is_zip_archive:
archive = zipfile.ZipFile(archive_path)
members = archive.infolist()
getname = lambda info : info.filename
isdir = lambda info: info.filename.endswith('/')
getname = lambda info: info.filename
isdir = lambda info: info.filename.endswith("/")
else:
archive = tarfile.open(archive_path)
members = archive.getmembers()
getname = lambda info : getattr(info, 'name')
getname = lambda info: getattr(info, "name")
isdir = lambda info: info.isdir()
if not topdir:
for d in (m for m in members if isdir(m)):
_name = getname(d)
if _name.endswith('/'):
if _name.endswith("/"):
_name = _name[:-1]
if not os.path.dirname(_name):
if topdir:
@ -249,9 +254,13 @@ def extract_archive(archive_path, dest_dir, topdir=None, name=None):
break
topdir = _name
if topdir:
members_to_extract = [m for m in members if getname(m).startswith(topdir+'/')]
members_to_extract = [
m for m in members if getname(m).startswith(topdir + "/")
]
os.makedirs(dest_dir, exist_ok=True)
with tempfile.TemporaryDirectory(prefix=os.path.basename(archive_path), dir=dest_dir) as tmpdir:
with tempfile.TemporaryDirectory(
prefix=os.path.basename(archive_path), dir=dest_dir
) as tmpdir:
if is_zip_archive:
_members_to_extract = [getname(m) for m in members_to_extract]
else:
@ -281,8 +290,8 @@ def run_command(command, cwd, context, *, env=None, input=None):
env = DefaultEnv()
log = None
try:
if not option('verbose'):
log = open(context.log_file, 'w')
if not option("verbose"):
log = open(context.log_file, "w")
print("run command '{}'".format(command), file=log)
print("current directory is '{}'".format(cwd), file=log)
print("env is :", file=log)
@ -293,8 +302,15 @@ def run_command(command, cwd, context, *, env=None, input=None):
log.flush()
kwargs = dict()
if input:
kwargs['stdin'] = subprocess.PIPE
process = subprocess.Popen(command, cwd=cwd, env=env, stdout=log or sys.stdout, stderr=subprocess.STDOUT, **kwargs)
kwargs["stdin"] = subprocess.PIPE
process = subprocess.Popen(
command,
cwd=cwd,
env=env,
stdout=log or sys.stdout,
stderr=subprocess.STDOUT,
**kwargs
)
if input:
input = input.encode()
while True:
@ -308,7 +324,7 @@ def run_command(command, cwd, context, *, env=None, input=None):
# `communicate` timeout (and we must set `input` to None
# to not communicate again).
input = None
print('.', end='', flush=True)
print(".", end="", flush=True)
else:
break
if process.returncode: