extract_archive can now extract zip format.

This commit is contained in:
Matthieu Gautier 2017-02-22 14:19:50 +01:00
parent 00acba9ee9
commit f2c841df6d
1 changed files with 38 additions and 12 deletions

View File

@ -1,7 +1,8 @@
import os.path import os.path
import hashlib import hashlib
import tarfile import tarfile, zipfile
import tempfile import tempfile
import os
from collections import namedtuple, defaultdict from collections import namedtuple, defaultdict
pj = os.path.join pj = os.path.join
@ -63,29 +64,54 @@ class Context:
def extract_archive(archive_path, dest_dir, topdir=None, name=None): def extract_archive(archive_path, dest_dir, topdir=None, name=None):
with tarfile.open(archive_path) as archive: is_zip_archive = archive_path.endswith('.zip')
try:
if is_zip_archive:
archive = zipfile.ZipFile(archive_path)
members = archive.infolist()
getname = lambda info : info.filename
isdir = lambda info: info.filename.endswith('/')
else:
archive = tarfile.open(archive_path)
members = archive.getmembers() members = archive.getmembers()
getname = lambda info : getattr(info, 'name')
isdir = lambda info: info.isdir()
if not topdir: if not topdir:
for d in (m for m in members if m.isdir()): for d in (m for m in members if isdir(m)):
if not os.path.dirname(d.name): _name = getname(d)
if _name.endswith('/'):
_name = _name[:-1]
if not os.path.dirname(_name):
if topdir: if topdir:
# There is already a top dir. # There is already a top dir.
# Two topdirs in the same archive. # Two topdirs in the same archive.
# Extract all # Extract all
topdir = None topdir = None
break break
topdir = d topdir = _name
else:
topdir = archive.getmember(topdir)
if topdir: if topdir:
members_to_extract = [m for m in members if m.name.startswith(topdir.name+'/')] members_to_extract = [m for m in members if getname(m).startswith(topdir+'/')]
os.makedirs(dest_dir, exist_ok=True) os.makedirs(dest_dir, exist_ok=True)
with tempfile.TemporaryDirectory(prefix=os.path.basename(archive_path), dir=dest_dir) as tmpdir: with tempfile.TemporaryDirectory(prefix=os.path.basename(archive_path), dir=dest_dir) as tmpdir:
archive.extractall(path=tmpdir, members=members_to_extract) if is_zip_archive:
name = name or topdir.name _members_to_extract = [getname(m) for m in members_to_extract]
os.rename(pj(tmpdir, topdir.name), pj(dest_dir, name)) else:
_members_to_extract = members_to_extract
archive.extractall(path=tmpdir, members=_members_to_extract)
if is_zip_archive:
for member in members_to_extract:
if isdir(member):
continue
perm = (member.external_attr >> 16) & 0x1FF
os.chmod(pj(tmpdir, getname(member)), perm)
name = name or topdir
os.rename(pj(tmpdir, topdir), pj(dest_dir, name))
else: else:
if name: if name:
dest_dir = pj(dest_dir, name) dest_dir = pj(dest_dir, name)
os.makedirs(dest_dir) os.makedirs(dest_dir)
archive.extractall(path=dest_dir) archive.extractall(path=dest_dir)
finally:
if archive is not None:
archive.close()