import json
import logging
import os
import re
from datetime import datetime
from itertools import groupby
from subprocess import PIPE, Popen
from django.conf import settings
from django.db.models import F
from django.utils.timezone import make_aware, now
import mediafile
import requests
from bs4 import BeautifulSoup as bs
from PIL import Image
from drupal2spip_lal.drupal import models as drupal
from drupal2spip_lal.spip import models as spip
logger = logging.getLogger('drupal2spip_lal')
def convert_timestamp(timestamp):
return make_aware(datetime.fromtimestamp(timestamp))
def strong_to_dl(html):
"""
Marie-Odile nous fait des dl-like à base de strong.
Parser html avec des regex est mal mais on essaie de reconstruire la dl
avant de casser la forme avec bs.
"""
is_strong = r'(?P.+)(?P
.*)$'
def is_strong_item(s):
return bool(re.match(is_strong, s))
items = re.split(r'[\r\n]+', html)
grouped_items = groupby(items, key=is_strong_item)
r = []
for key, group in grouped_items:
group = list(group)
if key and len(group) > 2:
dl = ['']
for elem in group:
match = re.match(is_strong, elem).groupdict()
dl += [
'- {}
'.format(match['dt'].strip()),
'- {}
'.format(match['dd'].strip()),
]
dl.append('
')
r += dl
else:
r += group
return '\n'.join(r)
def probe_video(path):
with Popen(
['ffprobe', '-print_format', 'json', '-show_streams', path],
stdout=PIPE,
stderr=PIPE,
) as proc:
streams = json.load(proc.stdout)['streams']
return [s for s in streams if s["codec_type"] == "video"]
def download(src, filename, force_download):
if src and src.startswith('/'):
src = 'http://{}{}'.format(settings.DRUPAL_FQDN, src)
elif src and src.startswith('sites/default/files/'):
src = 'http://{}/{}'.format(settings.DRUPAL_FQDN, src)
elif src and src.startswith('files/'):
src = 'http://{}/{}'.format(settings.DRUPAL_FQDN, src)
elif not re.match(r'^(https?)?://', src):
raise ValueError('Impossible de localiser: {}'.format(src))
path = os.path.join(settings.SPIP_LOGO_DIR, filename)
if not os.access(path, os.F_OK) or force_download:
r = requests.get(src, stream=True)
with open(path, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
return path
def get_extension(path):
ext = path.split('.')[-1]
if len(ext) < 2 or len(ext) > 4:
logger.warn('Extension curieuse: {}'.format(path))
ext = None
return ext or 'unknown'
def fetch_document(src, filename, force_download):
extension = get_extension(filename)
directory = os.path.join(settings.SPIP_LOGO_DIR, extension)
if not os.access(directory, os.F_OK):
try:
os.mkdir(directory)
except Exception as e:
logger.warn('Echec à creer le dossier: {}'.format(e))
cible = os.path.join(extension, filename)
path = None
try:
path = download(src, cible, force_download)
except Exception as e:
logger.warn('Echec au download: {}'.format(e))
return extension, cible, path
def fetch_and_remove_logo(article, force_download):
def fetch_logo(src):
"""
SPIP gère les logos à la façon d'un hack : un fichier dans IMG nommé
'arton{}.{}'.format(article.pk, ext)
"""
extension = get_extension(src)
filename = 'arton{}.{}'.format(article.pk, extension)
download(src, filename, force_download)
def remove_img(img):
has_siblings = [
elem
for elem in list(img.previous_siblings) + list(img.next_siblings)
if elem != '\n'
]
if img.parent.name in ['a', 'p'] and not has_siblings:
img.parent.replace_with('')
else:
img.replace_with('')
soup = bs(article.descriptif, 'html.parser')
img = soup.find('img')
src = img and img.attrs.get('src', None)
if src and re.match(r'^(https?)?://|^/', src):
fetch_logo(src)
remove_img(img)
article.descriptif = str(soup)
# L'image est généralement reprise dans le corps avec un format
# différent (par ex sans lien vers l'article).
soup = bs(article.texte, 'html.parser')
img = soup.find('img', src=src)
if img:
remove_img(img)
article.texte = str(soup)
article.save()
elif src:
logger.warn('Article {} has ignored logo: {}'.format(article.pk, src))
def filter_html(html):
def auto_p(html):
"""
La façon la plus simple de reproduire le comportement de drupal
est de reprendre le code php.
"""
current_path = os.path.dirname(__file__)
script = os.path.join(current_path, 'php/auto_p.php')
try:
with Popen(['php', '-f', script], stdin=PIPE, stdout=PIPE) as proc:
stdout = proc.communicate(html.encode('utf8'))
return stdout[0].decode('utf8')
except Exception as e:
raise ValueError("Echec de auto_p: {}".format(e))
def auto_a(html):
soup = bs(html, 'html.parser')
email_pattern = re.compile(r'(\b[\w\.\+_-]+@(\w+\.)+\w+\b)')
for line in soup.find_all(string=email_pattern):
if line.parent.name == 'a':
continue
a_string = email_pattern.sub(
r'\1', line.string
)
a_soup = bs(a_string, 'html.parser')
line.replace_with(a_soup)
protocols = [
'http',
'https',
'ftp',
'news',
'nntp',
'tel',
'telnet',
'mailto',
'irc',
'ssh',
'sftp',
'webcal',
'rtsp',
]
link_pattern = re.compile(
r'((\b({})s?)?://(\w+\.)+\w+/?[^\s]*)'.format('|'.join(protocols))
)
for line in soup.find_all(string=link_pattern):
if line.parent.name == 'a':
continue
a_string = link_pattern.sub(
r'\1', line.string
)
a_soup = bs(a_string, 'html.parser')
line.replace_with(a_soup)
return str(soup)
html = auto_p(html)
html = auto_a(html)
return html
def footnotes(html):
bracket_pattern = re.compile(r'\[\s*(/?)\s*(fn|footnote)\s*(\s[^\]]*)?\]')
html = bracket_pattern.sub(r'<\1\2\3>', html)
soup = bs(html, 'html.parser')
index = 1
seen_values = []
for fn in soup.find_all(['fn', 'footnote']):
if 'value' in fn.attrs and fn.attrs['value'] in seen_values:
value = fn.attrs['value']
fn.clear()
elif 'value' in fn.attrs and fn.attrs['value']:
value = fn.attrs['value']
seen_values.append(value)
else:
value = ""
seen_values.append(str(index))
index += 1
if value:
spip_fn = fn.wrap(soup.new_tag('spip:fn', value=value))
else:
spip_fn = fn.wrap(soup.new_tag('spip:fn'))
spip_fn.fn.unwrap()
html = str(soup)
# Spip a eu la bonne idée de choisir les crochets et chevrons
# pour indiquer les réfs.
spip_fn_open_pattern = re.compile(r'')
spip_fn_close_pattern = re.compile(r'')
spip_fn_value_pattern = re.compile(r'')
html = spip_fn_open_pattern.sub(r'[[', html)
html = spip_fn_close_pattern.sub(r']]', html)
html = spip_fn_value_pattern.sub(r'[[<\1>', html)
return html
def sanitarize_html(html, node_fmt):
html = strong_to_dl(html)
if node_fmt == 'PHP code':
raise NotImplementedError("Ce node est au format PHP.")
elif node_fmt == 'Filtered HTML':
html = filter_html(html)
elif node_fmt == 'Full HTML':
pass
else:
raise NotImplementedError("Ce node est dans un format inconnu.")
html = footnotes(html)
return html
def convert_node(node, options):
"""
Le point d'entrée fonctionnel c'est les Urls.
On se base donc là dessus pour vérifier si l'import
est à faire ou pas ou encore à upgrader.
"""
update = options.get('update', False)
force_download = options.get('force_download', False)
node_urls = drupal.UrlAlias.objects.filter(src='node/{}'.format(node.pk))
spip_urls = spip.Urls.objects.filter(
type='article', url__in=list(node_urls.values_list('dst', flat=True))
)
if spip_urls.exists():
article_id = spip_urls.first().id_objet
if (
sorted(spip_urls.values_list('url', flat=True))
!= sorted(node_urls.values_list('dst', flat=True))
or len(set(spip_urls.values_list('id_objet', flat=True))) != 1
or spip.Urls.objects.filter(
type='article', id_objet=article_id
).count()
!= spip_urls.count()
):
# incohérence dans les urls
raise ValueError(
"Cet article existe déjà dans SPIP sans qu'il soit possible "
"de s'assurer qu'il s'agisse d'un import prééexistant."
)
node_fmt = node.published_revision.format.name
article = None
article_attributes = {
'date': convert_timestamp(node.created),
'date_modif': convert_timestamp(node.changed),
'date_redac': convert_timestamp(node.created),
'descriptif': sanitarize_html(
node.published_revision.teaser, node_fmt
),
'maj': convert_timestamp(node.changed),
'statut': 'publie' if node.status else 'prepa',
'texte': sanitarize_html(node.published_revision.body, node_fmt),
'titre': node.title,
}
if not spip_urls.exists():
article = spip.Articles.objects.create(**article_attributes)
urls = [
spip.Urls(
id_objet=article.pk,
url=node_url.dst,
date=convert_timestamp(node.created),
)
for node_url in node_urls
]
spip.Urls.objects.bulk_create(urls)
logger.info(
'Article {} created from node {}.'.format(article.pk, node.pk)
)
elif update:
article = spip.Articles(
pk=spip_urls.last().id_objet, **article_attributes
)
article.save()
logger.info(
'Article {} updated from node {}.'.format(article.pk, node.pk)
)
else:
logger.info(
'Skipped node {}. Try --update to update it.'.format(node.pk)
)
if article:
user_attributes = {
'nom': node.user.name,
'email': node.user.mail,
'en_ligne': convert_timestamp(node.user.access),
'maj': convert_timestamp(node.user.created),
}
auteur, _ = spip.Auteurs.objects.update_or_create(
login=node.user.name, defaults=user_attributes
)
spip.AuteursLiens.objects.get_or_create(
auteur=auteur, id_objet=article.pk, objet='article'
)
fetch_and_remove_logo(article, force_download)
#
# Terms
#######
for term_node in node.termnode_set.all():
groupe, _ = spip.GroupesMots.objects.get_or_create(
titre=term_node.data.theme.name,
descriptif=term_node.data.theme.description,
texte=term_node.data.theme.help,
defaults={'maj': now},
)
mot, _ = spip.Mots.objects.get_or_create(
groupe=groupe,
type=groupe.titre,
titre=term_node.data.name,
descriptif=term_node.data.description,
defaults={'maj': now},
)
spip.MotsLiens.objects.get_or_create(
mot=mot, id_objet=article.pk, objet='article'
)
#
# Uploads
#########
images_mimes = [
'image/png',
'image/jpeg',
'image/svg+xml',
'image/gif',
]
audio_mimes = ['application/ogg', 'audio/x-wav', 'audio/mpeg']
video_mimes = ['video/mp4']
for upload in node.upload_set.filter(
revision=F('node__published_revision')
):
is_audio = upload.file.filemime in audio_mimes
is_image = upload.file.filemime in images_mimes
is_video = upload.file.filemime in video_mimes
extension, fichier, path = fetch_document(
upload.file.filepath, upload.file.filename, force_download
)
document_attributes = {
'fichier': fichier,
}
document_defaults = {
'extension': extension,
'media': 'file',
'titre': upload.description,
'date': convert_timestamp(upload.file.timestamp),
'taille': upload.file.filesize,
'date_publication': convert_timestamp(upload.file.timestamp),
'mode': 'image' if is_image else 'document',
}
if is_image:
document_defaults['media'] = 'image'
try:
m = Image.open(path)
document_defaults['hauteur'] = m.height
document_defaults['largeur'] = m.width
except Exception as e:
logger.warn('Echec de lecture: {}'.format(e))
elif is_audio:
document_defaults['media'] = 'audio'
try:
m = mediafile.MediaFile(path)
document_defaults['duree'] = m.length
if m.artist and m.album:
document_defaults['credits'] = '{} / {}'.format(
m.artist, m.album
)
elif m.artist:
document_defaults['credits'] = m.artist
elif m.album:
document_defaults['credits'] = m.album
except Exception as e:
logger.warn('Echec de lecture: {}'.format(e))
elif is_video:
document_defaults['media'] = 'video'
try:
m = probe_video(path)[0]
document_defaults['duree'] = float(m.get('duration', 0))
document_defaults['hauteur'] = m.get('height', 0)
document_defaults['largeur'] = m.get('width', 0)
except Exception as e:
logger.warn('Echec de lecture: {}'.format(e))
document, _ = spip.Documents.objects.update_or_create(
**document_attributes,
defaults={**document_defaults, 'maj': now},
)
spip.DocumentsLiens.objects.get_or_create(
document=document,
id_objet=article.pk,
objet='article',
rang_lien=upload.weight,
)