drupal2spip_lal/drupal2spip_lal/base/convert.py

461 lines
15 KiB
Python

import json
import logging
import os
import re
from datetime import datetime
from itertools import groupby
from subprocess import PIPE, Popen
from django.conf import settings
from django.db.models import F
from django.utils.timezone import make_aware, now
import mediafile
import request
from bs4 import BeautifulSoup as bs
from PIL import Image
from drupal2spip_lal.drupal import models as drupal
from drupal2spip_lal.spip import models as spip
logger = logging.getLogger('drupal2spip_lal')
def convert_timestamp(timestamp):
return make_aware(datetime.fromtimestamp(timestamp))
def strong_to_dl(html):
"""
Marie-Odile nous fait des dl-like à base de strong.
Parser html avec des regex est mal mais on essaie de reconstruire la dl
avant de casser la forme avec bs.
"""
is_strong = r'<strong>(?P<dt>.+)</strong>(?P<dd>.*)$'
def is_strong_item(s):
return bool(re.match(is_strong, s))
items = re.split(r'[\r\n]+', html)
grouped_items = groupby(items, key=is_strong_item)
r = []
for key, group in grouped_items:
group = list(group)
if key and len(group) > 2:
dl = ['<dl class="strong_to_dl">']
for elem in group:
match = re.match(is_strong, elem).groupdict()
dl += [
'<dt>{}</dt>'.format(match['dt'].strip()),
'<dd>{}</dd>'.format(match['dd'].strip()),
]
dl.append('</dl>')
r += dl
else:
r += group
return '\n'.join(r)
def probe_video(path):
with Popen(
['ffprobe', '-print_format', 'json', '-show_streams', path],
stdout=PIPE,
stderr=PIPE,
) as proc:
streams = json.load(proc.stdout)['streams']
return [s for s in streams if s["codec_type"] == "video"]
def download(src, filename, force_download):
if src and src.startswith('/'):
src = 'http://{}{}'.format(settings.DRUPAL_FQDN, src)
elif src and src.startswith('sites/default/files/'):
src = 'http://{}/{}'.format(settings.DRUPAL_FQDN, src)
elif not re.match(r'^(https?)?://', src):
raise ValueError('Impossible de localiser: {}'.format(src))
path = os.path.join(settings.SPIP_LOGO_DIR, filename)
if not os.access(path, os.F_OK) or force_download:
r = request.get(src, stream=True)
with open(path, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
return path
def fetch_document(src, filename, force_download):
extension = filename.split('.')[-1] or 'unknown'
directory = os.path.join(settings.SPIP_LOGO_DIR, extension)
if not os.access(directory, os.F_OK):
try:
os.mkdir(directory)
except Exception as e:
logger.warn('Echec à creer le dossier: {}'.format(e))
cible = os.path.join(extension, filename)
path = None
try:
path = download(src, cible, force_download)
except Exception as e:
logger.warn('Echec au download: {}'.format(e))
return extension, cible, path
def fetch_and_remove_logo(article, force_download):
def fetch_logo(src):
"""
SPIP gère les logos à la façon d'un hack : un fichier dans IMG nommé
'arton{}.{}'.format(article.pk, ext)
"""
ext = src.split('.')[-1]
filename = 'arton{}.{}'.format(article.pk, ext)
download(src, filename, force_download)
def remove_img(img):
has_siblings = [
elem
for elem in list(img.previous_siblings) + list(img.next_siblings)
if elem != '\n'
]
if img.parent.name in ['a', 'p'] and not has_siblings:
img.parent.replace_with('')
else:
img.replace_with('')
soup = bs(article.descriptif, 'html.parser')
img = soup.find('img')
src = img and img.attrs.get('src', None)
if src and re.match(r'^(https?)?://|^/', src):
fetch_logo(src)
remove_img(img)
article.descriptif = str(soup)
# L'image est généralement reprise dans le corps avec un format
# différent (par ex sans lien vers l'article).
soup = bs(article.texte, 'html.parser')
img = soup.find('img', src=src)
if img:
remove_img(img)
article.texte = str(soup)
article.save()
elif src:
logger.warn('Article {} has ignored logo: {}'.format(article.pk, src))
def filter_html(html):
def auto_p(html):
"""
La façon la plus simple de reproduire le comportement de drupal
est de reprendre le code php.
"""
current_path = os.path.dirname(__file__)
script = os.path.join(current_path, 'php/auto_p.php')
try:
with Popen(['php', '-f', script], stdin=PIPE, stdout=PIPE) as proc:
stdout = proc.communicate(html.encode('utf8'))
return stdout[0].decode('utf8')
except Exception as e:
raise ValueError("Echec de auto_p: {}".format(e))
def auto_a(html):
soup = bs(html, 'html.parser')
email_pattern = re.compile(r'(\b[\w\.\+_-]+@(\w+\.)+\w+\b)')
for line in soup.find_all(string=email_pattern):
if line.parent.name == 'a':
continue
a_string = email_pattern.sub(
r'<a class= "auto-a" href="mailto:\1">\1</a>', line.string
)
a_soup = bs(a_string, 'html.parser')
line.replace_with(a_soup)
protocols = [
'http',
'https',
'ftp',
'news',
'nntp',
'tel',
'telnet',
'mailto',
'irc',
'ssh',
'sftp',
'webcal',
'rtsp',
]
link_pattern = re.compile(
r'((\b({})s?)?://(\w+\.)+\w+/?[^\s]*)'.format('|'.join(protocols))
)
for line in soup.find_all(string=link_pattern):
if line.parent.name == 'a':
continue
a_string = link_pattern.sub(
r'<a class="auto-a" href="\1">\1</a>', line.string
)
a_soup = bs(a_string, 'html.parser')
line.replace_with(a_soup)
return str(soup)
html = auto_p(html)
html = auto_a(html)
return html
def footnotes(html):
bracket_pattern = re.compile(r'\[\s*(/?)\s*(fn|footnote)\s*(\s[^\]]*)?\]')
html = bracket_pattern.sub(r'<\1\2\3>', html)
soup = bs(html, 'html.parser')
index = 1
seen_values = []
for fn in soup.find_all(['fn', 'footnote']):
if 'value' in fn.attrs and fn.attrs['value'] in seen_values:
value = fn.attrs['value']
fn.clear()
elif 'value' in fn.attrs and fn.attrs['value']:
value = fn.attrs['value']
seen_values.append(value)
else:
value = ""
seen_values.append(str(index))
index += 1
if value:
spip_fn = fn.wrap(soup.new_tag('spip:fn', value=value))
else:
spip_fn = fn.wrap(soup.new_tag('spip:fn'))
spip_fn.fn.unwrap()
html = str(soup)
# Spip a eu la bonne idée de choisir les crochets et chevrons
# pour indiquer les réfs.
spip_fn_open_pattern = re.compile(r'<spip:fn>')
spip_fn_close_pattern = re.compile(r'</spip:fn>')
spip_fn_value_pattern = re.compile(r'<spip:fn value="([^"]+)">')
html = spip_fn_open_pattern.sub(r'[[', html)
html = spip_fn_close_pattern.sub(r']]', html)
html = spip_fn_value_pattern.sub(r'[[<\1>', html)
return html
def sanitarize_html(html, node_fmt):
html = strong_to_dl(html)
if node_fmt == 'PHP code':
raise NotImplementedError("Ce node est au format PHP.")
elif node_fmt == 'Filtered HTML':
html = filter_html(html)
elif node_fmt == 'Full HTML':
pass
else:
raise NotImplementedError("Ce node est dans un format inconnu.")
html = footnotes(html)
return html
def convert_node(node, options):
"""
Le point d'entrée fonctionnel c'est les Urls.
On se base donc là dessus pour vérifier si l'import
est à faire ou pas ou encore à upgrader.
"""
update = options.get('update', False)
force_download = options.get('force_download', False)
node_urls = drupal.UrlAlias.objects.filter(src='node/{}'.format(node.pk))
spip_urls = spip.Urls.objects.filter(
type='article', url__in=list(node_urls.values_list('dst', flat=True))
)
if spip_urls.exists():
article_id = spip_urls.first().id_objet
if (
sorted(spip_urls.values_list('url', flat=True))
!= sorted(node_urls.values_list('dst', flat=True))
or len(set(spip_urls.values_list('id_objet', flat=True))) != 1
or spip.Urls.objects.filter(
type='article', id_objet=article_id
).count()
!= spip_urls.count()
):
# incohérence dans les urls
raise ValueError(
"Cet article existe déjà dans SPIP sans qu'il soit possible "
"de s'assurer qu'il s'agisse d'un import prééexistant."
)
node_fmt = node.published_revision.format.name
article = None
article_attributes = {
'date': convert_timestamp(node.published_revision.timestamp),
'date_modif': convert_timestamp(node.changed),
'date_redac': convert_timestamp(node.created),
'descriptif': sanitarize_html(
node.published_revision.teaser, node_fmt
),
'maj': convert_timestamp(node.changed),
'statut': 'publie' if node.status else 'prepa',
'texte': sanitarize_html(node.published_revision.body, node_fmt),
'titre': node.title,
}
if not spip_urls.exists():
article = spip.Articles.objects.create(**article_attributes)
urls = [
spip.Urls(
id_objet=article.pk,
url=node_url.dst,
date=convert_timestamp(node.created),
)
for node_url in node_urls
]
spip.Urls.objects.bulk_create(urls)
logger.info(
'Article {} created from node {}.'.format(article.pk, node.pk)
)
elif update:
article = spip.Articles(
pk=spip_urls.last().id_objet, **article_attributes
)
article.save()
logger.info(
'Article {} updated from node {}.'.format(article.pk, node.pk)
)
else:
logger.info(
'Skipped node {}. Try --update to update it.'.format(node.pk)
)
if article:
user_attributes = {
'nom': node.user.name,
'email': node.user.mail,
'en_ligne': convert_timestamp(node.user.access),
'maj': convert_timestamp(node.user.created),
}
auteur, _ = spip.Auteurs.objects.update_or_create(
login=node.user.name, defaults=user_attributes
)
spip.AuteursLiens.objects.get_or_create(
auteur=auteur, id_objet=article.pk, objet='article'
)
fetch_and_remove_logo(article, force_download)
#
# Terms
#######
for term_node in node.termnode_set.all():
groupe, _ = spip.GroupesMots.objects.get_or_create(
titre=term_node.data.theme.name,
descriptif=term_node.data.theme.description,
texte=term_node.data.theme.help,
defaults={'maj': now},
)
mot, _ = spip.Mots.objects.get_or_create(
groupe=groupe,
type=groupe.titre,
titre=term_node.data.name,
descriptif=term_node.data.description,
defaults={'maj': now},
)
spip.MotsLiens.objects.get_or_create(
mot=mot, id_objet=article.pk, objet='article'
)
#
# Uploads
#########
images_mimes = [
'image/png',
'image/jpeg',
'image/svg+xml',
'image/gif',
]
audio_mimes = ['application/ogg', 'audio/x-wav', 'audio/mpeg']
video_mimes = ['video/mp4']
for upload in node.upload_set.filter(
revision=F('node__published_revision')
):
is_audio = upload.file.filemime in audio_mimes
is_image = upload.file.filemime in images_mimes
is_video = upload.file.filemime in video_mimes
extension, fichier, path = fetch_document(
upload.file.filepath, upload.file.filename, force_download
)
document_attributes = {
'media': 'file',
'extension': extension,
'fichier': fichier,
}
document_defaults = {
'titre': upload.description,
'date': convert_timestamp(upload.file.timestamp),
'taille': upload.file.filesize,
'date_publication': convert_timestamp(upload.file.timestamp),
'mode': 'image' if is_image else 'document',
}
if is_image:
document_defaults['media'] = 'image'
try:
m = Image.open(path)
document_defaults['hauteur'] = m.height
document_defaults['largeur'] = m.width
except Exception as e:
logger.warn('Echec de lecture: {}'.format(e))
elif is_audio:
document_defaults['media'] = 'audio'
try:
m = mediafile.MediaFile(path)
document_defaults['duree'] = m.length
if m.artist and m.album:
document_defaults['credits'] = '{} / {}'.format(
m.artist, m.album
)
elif m.artist:
document_defaults['credits'] = m.artist
elif m.album:
document_defaults['credits'] = m.album
except Exception as e:
logger.warn('Echec de lecture: {}'.format(e))
elif is_video:
document_defaults['media'] = 'video'
try:
m = probe_video(path)[0]
document_defaults['duree'] = float(m.get('duration', 0))
document_defaults['hauteur'] = m.get('height', 0)
document_defaults['largeur'] = m.get('width', 0)
except Exception as e:
logger.warn('Echec de lecture: {}'.format(e))
document, _ = spip.Documents.objects.update_or_create(
**document_attributes,
defaults={**document_defaults, 'maj': now},
)
spip.DocumentsLiens.objects.get_or_create(
document=document,
id_objet=article.pk,
objet='article',
rang_lien=upload.weight,
)