import json import logging import os import re from datetime import datetime from itertools import groupby from subprocess import PIPE, Popen from django.conf import settings from django.db.models import F from django.utils.timezone import make_aware, now import mediafile import requests from bs4 import BeautifulSoup as bs from PIL import Image from drupal2spip_lal.drupal import models as drupal from drupal2spip_lal.spip import models as spip logger = logging.getLogger('drupal2spip_lal') def convert_timestamp(timestamp): return make_aware(datetime.fromtimestamp(timestamp)) def strong_to_dl(html): """ Marie-Odile nous fait des dl-like à base de strong. Parser html avec des regex est mal mais on essaie de reconstruire la dl avant de casser la forme avec bs. """ is_strong = r'(?P
.+)(?P
.*)$' def is_strong_item(s): return bool(re.match(is_strong, s)) items = re.split(r'[\r\n]+', html) grouped_items = groupby(items, key=is_strong_item) r = [] for key, group in grouped_items: group = list(group) if key and len(group) > 2: dl = ['
'] for elem in group: match = re.match(is_strong, elem).groupdict() dl += [ '
{}
'.format(match['dt'].strip()), '
{}
'.format(match['dd'].strip()), ] dl.append('
') r += dl else: r += group return '\n'.join(r) def probe_video(path): with Popen( ['ffprobe', '-print_format', 'json', '-show_streams', path], stdout=PIPE, stderr=PIPE, ) as proc: streams = json.load(proc.stdout)['streams'] return [s for s in streams if s["codec_type"] == "video"] def download(src, filename, force_download): if src and src.startswith('/'): src = 'http://{}{}'.format(settings.DRUPAL_FQDN, src) elif src and src.startswith('sites/default/files/'): src = 'http://{}/{}'.format(settings.DRUPAL_FQDN, src) elif src and src.startswith('files/'): src = 'http://{}/{}'.format(settings.DRUPAL_FQDN, src) elif not re.match(r'^(https?)?://', src): raise ValueError('Impossible de localiser: {}'.format(src)) path = os.path.join(settings.SPIP_LOGO_DIR, filename) if not os.access(path, os.F_OK) or force_download: r = requests.get(src, stream=True) with open(path, 'wb') as fd: for chunk in r.iter_content(chunk_size=128): fd.write(chunk) return path def get_extension(path): ext = path.split('.')[-1] if len(ext) < 2 or len(ext) > 4: logger.warn('Extension curieuse: {}'.format(path)) ext = None return ext or 'unknown' def fetch_document(src, filename, force_download): extension = get_extension(filename) directory = os.path.join(settings.SPIP_LOGO_DIR, extension) if not os.access(directory, os.F_OK): try: os.mkdir(directory) except Exception as e: logger.warn('Echec à creer le dossier: {}'.format(e)) cible = os.path.join(extension, filename) path = None try: path = download(src, cible, force_download) except Exception as e: logger.warn('Echec au download: {}'.format(e)) return extension, cible, path def fetch_and_remove_logo(article, force_download): def fetch_logo(src): """ SPIP gère les logos à la façon d'un hack : un fichier dans IMG nommé 'arton{}.{}'.format(article.pk, ext) """ extension = get_extension(src) filename = 'arton{}.{}'.format(article.pk, extension) download(src, filename, force_download) def remove_img(img): has_siblings = [ elem for elem in list(img.previous_siblings) + list(img.next_siblings) if elem != '\n' ] if img.parent.name in ['a', 'p'] and not has_siblings: img.parent.replace_with('') else: img.replace_with('') soup = bs(article.descriptif, 'html.parser') img = soup.find('img') src = img and img.attrs.get('src', None) if src and re.match(r'^(https?)?://|^/', src): fetch_logo(src) remove_img(img) article.descriptif = str(soup) # L'image est généralement reprise dans le corps avec un format # différent (par ex sans lien vers l'article). soup = bs(article.texte, 'html.parser') img = soup.find('img', src=src) if img: remove_img(img) article.texte = str(soup) article.save() elif src: logger.warn('Article {} has ignored logo: {}'.format(article.pk, src)) def filter_html(html): def auto_p(html): """ La façon la plus simple de reproduire le comportement de drupal est de reprendre le code php. """ current_path = os.path.dirname(__file__) script = os.path.join(current_path, 'php/auto_p.php') try: with Popen(['php', '-f', script], stdin=PIPE, stdout=PIPE) as proc: stdout = proc.communicate(html.encode('utf8')) return stdout[0].decode('utf8') except Exception as e: raise ValueError("Echec de auto_p: {}".format(e)) def auto_a(html): soup = bs(html, 'html.parser') email_pattern = re.compile(r'(\b[\w\.\+_-]+@(\w+\.)+\w+\b)') for line in soup.find_all(string=email_pattern): if line.parent.name == 'a': continue a_string = email_pattern.sub( r'\1', line.string ) a_soup = bs(a_string, 'html.parser') line.replace_with(a_soup) protocols = [ 'http', 'https', 'ftp', 'news', 'nntp', 'tel', 'telnet', 'mailto', 'irc', 'ssh', 'sftp', 'webcal', 'rtsp', ] link_pattern = re.compile( r'((\b({})s?)?://(\w+\.)+\w+/?[^\s]*)'.format('|'.join(protocols)) ) for line in soup.find_all(string=link_pattern): if line.parent.name == 'a': continue a_string = link_pattern.sub( r'\1', line.string ) a_soup = bs(a_string, 'html.parser') line.replace_with(a_soup) return str(soup) html = auto_p(html) html = auto_a(html) return html def footnotes(html): bracket_pattern = re.compile(r'\[\s*(/?)\s*(fn|footnote)\s*(\s[^\]]*)?\]') html = bracket_pattern.sub(r'<\1\2\3>', html) soup = bs(html, 'html.parser') index = 1 seen_values = [] for fn in soup.find_all(['fn', 'footnote']): if 'value' in fn.attrs and fn.attrs['value'] in seen_values: value = fn.attrs['value'] fn.clear() elif 'value' in fn.attrs and fn.attrs['value']: value = fn.attrs['value'] seen_values.append(value) else: value = "" seen_values.append(str(index)) index += 1 if value: spip_fn = fn.wrap(soup.new_tag('spip:fn', value=value)) else: spip_fn = fn.wrap(soup.new_tag('spip:fn')) spip_fn.fn.unwrap() html = str(soup) # Spip a eu la bonne idée de choisir les crochets et chevrons # pour indiquer les réfs. spip_fn_open_pattern = re.compile(r'') spip_fn_close_pattern = re.compile(r'') spip_fn_value_pattern = re.compile(r'') html = spip_fn_open_pattern.sub(r'[[', html) html = spip_fn_close_pattern.sub(r']]', html) html = spip_fn_value_pattern.sub(r'[[<\1>', html) return html def sanitarize_html(html, node_fmt): html = strong_to_dl(html) if node_fmt == 'PHP code': raise NotImplementedError("Ce node est au format PHP.") elif node_fmt == 'Filtered HTML': html = filter_html(html) elif node_fmt == 'Full HTML': pass else: raise NotImplementedError("Ce node est dans un format inconnu.") html = footnotes(html) return html def convert_node(node, options): """ Le point d'entrée fonctionnel c'est les Urls. On se base donc là dessus pour vérifier si l'import est à faire ou pas ou encore à upgrader. """ update = options.get('update', False) force_download = options.get('force_download', False) node_urls = drupal.UrlAlias.objects.filter(src='node/{}'.format(node.pk)) spip_urls = spip.Urls.objects.filter( type='article', url__in=list(node_urls.values_list('dst', flat=True)) ) if spip_urls.exists(): article_id = spip_urls.first().id_objet if ( sorted(spip_urls.values_list('url', flat=True)) != sorted(node_urls.values_list('dst', flat=True)) or len(set(spip_urls.values_list('id_objet', flat=True))) != 1 or spip.Urls.objects.filter( type='article', id_objet=article_id ).count() != spip_urls.count() ): # incohérence dans les urls raise ValueError( "Cet article existe déjà dans SPIP sans qu'il soit possible " "de s'assurer qu'il s'agisse d'un import prééexistant." ) node_fmt = node.published_revision.format.name article = None article_attributes = { 'date': convert_timestamp(node.created), 'date_modif': convert_timestamp(node.changed), 'date_redac': convert_timestamp(node.created), 'descriptif': sanitarize_html( node.published_revision.teaser, node_fmt ), 'maj': convert_timestamp(node.changed), 'statut': 'publie' if node.status else 'prepa', 'texte': sanitarize_html(node.published_revision.body, node_fmt), 'titre': node.title, } if not spip_urls.exists(): article = spip.Articles.objects.create(**article_attributes) urls = [ spip.Urls( id_objet=article.pk, url=node_url.dst, date=convert_timestamp(node.created), ) for node_url in node_urls ] spip.Urls.objects.bulk_create(urls) logger.info( 'Article {} created from node {}.'.format(article.pk, node.pk) ) elif update: article = spip.Articles( pk=spip_urls.last().id_objet, **article_attributes ) article.save() logger.info( 'Article {} updated from node {}.'.format(article.pk, node.pk) ) else: logger.info( 'Skipped node {}. Try --update to update it.'.format(node.pk) ) if article: user_attributes = { 'nom': node.user.name, 'email': node.user.mail, 'en_ligne': convert_timestamp(node.user.access), 'maj': convert_timestamp(node.user.created), } auteur, _ = spip.Auteurs.objects.update_or_create( login=node.user.name, defaults=user_attributes ) spip.AuteursLiens.objects.get_or_create( auteur=auteur, id_objet=article.pk, objet='article' ) fetch_and_remove_logo(article, force_download) # # Terms ####### for term_node in node.termnode_set.all(): groupe, _ = spip.GroupesMots.objects.get_or_create( titre=term_node.data.theme.name, descriptif=term_node.data.theme.description, texte=term_node.data.theme.help, defaults={'maj': now}, ) mot, _ = spip.Mots.objects.get_or_create( groupe=groupe, type=groupe.titre, titre=term_node.data.name, descriptif=term_node.data.description, defaults={'maj': now}, ) spip.MotsLiens.objects.get_or_create( mot=mot, id_objet=article.pk, objet='article' ) # # Uploads ######### images_mimes = [ 'image/png', 'image/jpeg', 'image/svg+xml', 'image/gif', ] audio_mimes = ['application/ogg', 'audio/x-wav', 'audio/mpeg'] video_mimes = ['video/mp4'] for upload in node.upload_set.filter( revision=F('node__published_revision') ): is_audio = upload.file.filemime in audio_mimes is_image = upload.file.filemime in images_mimes is_video = upload.file.filemime in video_mimes extension, fichier, path = fetch_document( upload.file.filepath, upload.file.filename, force_download ) document_attributes = { 'fichier': fichier, } document_defaults = { 'extension': extension, 'media': 'file', 'titre': upload.description, 'date': convert_timestamp(upload.file.timestamp), 'taille': upload.file.filesize, 'date_publication': convert_timestamp(upload.file.timestamp), 'mode': 'image' if is_image else 'document', } if is_image: document_defaults['media'] = 'image' try: m = Image.open(path) document_defaults['hauteur'] = m.height document_defaults['largeur'] = m.width except Exception as e: logger.warn('Echec de lecture: {}'.format(e)) elif is_audio: document_defaults['media'] = 'audio' try: m = mediafile.MediaFile(path) document_defaults['duree'] = m.length if m.artist and m.album: document_defaults['credits'] = '{} / {}'.format( m.artist, m.album ) elif m.artist: document_defaults['credits'] = m.artist elif m.album: document_defaults['credits'] = m.album except Exception as e: logger.warn('Echec de lecture: {}'.format(e)) elif is_video: document_defaults['media'] = 'video' try: m = probe_video(path)[0] document_defaults['duree'] = float(m.get('duration', 0)) document_defaults['hauteur'] = m.get('height', 0) document_defaults['largeur'] = m.get('width', 0) except Exception as e: logger.warn('Echec de lecture: {}'.format(e)) document, _ = spip.Documents.objects.update_or_create( **document_attributes, defaults={**document_defaults, 'maj': now}, ) spip.DocumentsLiens.objects.get_or_create( document=document, id_objet=article.pk, objet='article', rang_lien=upload.weight, )