"""Generate stats of service pad from logs""" import argparse import enum import subprocess from datetime import date, timedelta from typing import Tuple, Optional, Dict # Tags used in logs by etherpad to separate parts TAG_LEVEL_INFO = "[INFO]" TAG_LEVEL_WARN = "[WARN]" TAG_EVENT_CREATE = "[CREATE]" TAG_EVENT_ENTER = "[ENTER]" class LogLevel(enum.Enum): """Log entry level""" INFO = "INFO" WARN = "WARN" class EventType(enum.Enum): """Author Event type""" CREATE = "CREATE" ENTER = "ENTER" class Stats: """Store data about usage of service.""" def __init__(self): self.authors = set() self.created_pads = set() self.visited_pads = set() self.enter_pad_count = 0 self.enter_author_in_pad = set() self.exception_count = 0 def __str__(self): return f'Stats:\n' \ f' author_count = {len(self.authors)}\n' \ f' pad_created_count = {len(self.created_pads)}\n' \ f' pad_used_count = {len(self.visited_pads)}\n' \ f' enter_pad_count = {self.enter_pad_count}\n' \ f' enter_pad_unique_count = {len(self.enter_author_in_pad)}\n' \ f' exception_count = {self.exception_count}' def compute_time_interval() -> Tuple[str, str]: """ Generate default from and until date to filter logs. From is first day of previous month at 00:00:00. Until is last day of previous month at 23:59:59. """ today = date.today() first_day_cur_month = today.replace(day=1) last_day_prev_month = first_day_cur_month - timedelta(days=1) first_day_prev_month = last_day_prev_month.replace(day=1) return f"{first_day_prev_month} 00:00:00", f"{last_day_prev_month} 23:59:59" def parse_log_line(line: bytes) -> Tuple[Optional[LogLevel], Optional[str]]: """Parse etherpad log line to find log level and get only useful part of line.""" decoded_line = line.decode('utf-8', '') decoded_line = decoded_line.rstrip('\n') info_index = decoded_line.find(TAG_LEVEL_INFO) if info_index != -1: return LogLevel.INFO, decoded_line[info_index + len(TAG_LEVEL_INFO):] warn_index = decoded_line.find(TAG_LEVEL_WARN) if warn_index != -1: return LogLevel.WARN, decoded_line[info_index + len(TAG_LEVEL_WARN):] return None, None def parse_event_properties(action_line: str) -> Dict[str, str]: """Parse properties of event based on format used by etherpad: key1:value1 key2:value2""" properties = {} for key_word in action_line.split(" "): if ":" not in key_word: continue key, word = key_word.split(":", 1) properties[key] = word return properties def parse_info_line(line: str) -> Tuple[Optional[EventType], Optional[str]]: """Parse an INFO log line to find potential event.""" event, event_line, event_dict = None, None, None create_index = line.find(TAG_EVENT_CREATE) if create_index != -1: event = EventType.CREATE event_line = line[create_index + len(TAG_EVENT_CREATE):] enter_index = line.find(TAG_EVENT_ENTER) if enter_index != -1: event = EventType.ENTER event_line = line[enter_index + len(TAG_EVENT_ENTER):] if event and event_line: event_dict = parse_event_properties(event_line) return event, event_dict def process_info_line(stats: Stats, line: str) -> None: """Parse INFO line and add to stats wanted data.""" event, event_dict = parse_info_line(line) if not event or not event_dict: return # Add author to author set, so we can count total number of author author = event_dict.get('authorID', None) if author: stats.authors.add(author) # Add pad name to visited_pad set, so we can count used pad pad = event_dict.get('pad', None) if pad: stats.visited_pads.add(pad) # If event is CREATE , add pad to created_pad set, so we can count created pad if pad and event == EventType.CREATE: stats.created_pads.add(pad) # Count number of ENTER action if event == EventType.ENTER: stats.enter_pad_count += 1 # Add pad and author concatenated in a set, so we can count uniques ENTER action if pad and author and event == EventType.ENTER: stats.enter_author_in_pad.add(f"{pad}||{author}") def process_error_line(stats, line): """Parse WARN line and add to stats wanted data.""" if "TypeError:" in line: stats.exception_count += 1 def analyze_logs(unit: str, since: str, until: str, stats: Stats) -> None: """Analyze logs of unit at specified time.""" args = [ "/bin/journalctl", "-u", unit, "--since", since, "--until", until, "--no-pager" ] with subprocess.Popen(args, stdout=subprocess.PIPE) as process: for line in process.stdout: log_level, log_line = parse_log_line(line) process_line = { LogLevel.INFO: process_info_line, LogLevel.WARN: process_error_line }.get(log_level, None) if process_line: process_line(stats, log_line) def main() -> None: """Entry point of generate_pad_stats command""" # Parse arguments parser = argparse.ArgumentParser(description='Generate stats for service pad.') parser.add_argument('--since', type=str, help='Analyze log entries after specified date.' 'Default is first day of previous month.') parser.add_argument('--until', type=str, help='Analyze log entries before specified date.' 'Default is last day of previous month.') parser.add_argument('--unit', type=str, default="etherpad-lite.service", help='Get logs from this systemd unit') args = parser.parse_args() # Init stats = Stats() since, until = compute_time_interval() # Override default time filter with requested if args.since: since = args.since if args.until: until = args.until # Analyze logs analyze_logs(args.unit, since, until, stats) # Print result print(f"Date filter:\n" f" since = {since}\n" f" until = {until}\n") print(stats) if __name__ == '__main__': main()