BookStack-Python-exporter/exporter.py
2023-12-31 16:20:27 +01:00

521 lines
17 KiB
Python

import argparse
import json
import logging
import os
from datetime import datetime
from logging import info, error, debug
from pathlib import Path
import sys
from typing import Dict, List, Union
from urllib.request import urlopen, Request
import urllib.parse
import base64
# (formatName, fileExtension)
FORMATS: Dict['str', 'str'] = {
'markdown': 'md',
'plaintext': 'txt',
'pdf': 'pdf',
'html': 'html'
}
LEVELS = ['pages', 'chapters', 'books']
LOG_LEVEL: Dict = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR
}
# Characters in filenames to be replaced with "_"
FORBIDDEN_CHARS: List[str] = ["/", "#"]
parser = argparse.ArgumentParser(description='BookStack exporter')
parser.add_argument('-p',
'--path',
type=str,
default='.',
help='Path where exported files will be placed.')
parser.add_argument(
'-t',
'--token-file',
type=str,
default=f'.{os.path.sep}token.txt',
help='File containing authorization token in format TOKEN_ID:TOKEN_SECRET')
parser.add_argument(
'-H',
'--host',
type=str,
default='https://localhost',
help='Your domain with protocol prefix, example: https://example.com')
parser.add_argument('-f',
'--formats',
type=str,
default=['markdown'],
nargs="+",
help='Space separated list of formats to use for export.',
choices=FORMATS.keys())
parser.add_argument('-c',
'--forbidden-chars',
type=str,
default=FORBIDDEN_CHARS,
nargs="+",
help='Space separated list of symbols to be replaced '
'with "_" in filenames.')
parser.add_argument('-u',
'--user-agent',
type=str,
default="BookStack exporter",
help='User agent header content. In situations'
' where requests are blocked because of bad client/'
'unrecognized web browser/etc (like with CloudFlare'
' tunnels), change to some typical '
'web browser user agent header.')
parser.add_argument('--additional-headers',
type=str,
nargs="+",
default=[],
help='List of arbitrary additional HTTP headers to be '
'sent with every HTTP request. They can override default'
' ones, including Authorization header. IMPORTANT: '
'these headers are also sent when downloading external '
'attachments! Don\'t put here any private data.'
'Example: -u "Header1: value1" "Header2: value2"')
parser.add_argument(
'-l',
'--level',
type=str,
default=['pages'],
nargs="+",
help="Space separated list of levels at which should be export "
"performed. ",
choices=LEVELS)
parser.add_argument(
'--force-update-files',
action='store_true',
help="Set this option to skip checking local files timestamps against "
"remote last edit timestamps. This will cause overwriting local files,"
" even if they seem to be already in newest version.")
parser.add_argument(
'--dont-export-attachments',
action='store_true',
help=
"Set this to prevent exporting attachments that were uploaded to BookStack."
)
parser.add_argument(
'--dont-export-external-attachments',
action='store_true',
help="Set this to prevent exporting external attachments (from links).")
parser.set_defaults(force_update_files=False)
parser.set_defaults(dont_export_attachments=False)
parser.set_defaults(dont_export_external_attachments=False)
parser.add_argument('-V',
'--log-level',
type=str,
default='info',
help='Set verbosity level.',
choices=LOG_LEVEL.keys())
args = parser.parse_args()
def removesuffix(text, suffix):
"""Remove suffix from text if matched."""
if text.endswith(suffix):
return text[:len(text) - len(suffix)]
return text
logging.basicConfig(format='%(levelname)s :: %(message)s',
level=LOG_LEVEL.get(args.log_level))
formats: List[str] = args.formats
FORBIDDEN_CHARS = args.forbidden_chars
for frmt in formats:
if frmt not in FORMATS:
error("Unknown format name (NOT file extension), "
"check api docs for current version of your BookStack")
sys.exit(1)
API_PREFIX: str = f"{removesuffix(args.host, os.path.sep)}/api"
FS_PATH: str = removesuffix(args.path, os.path.sep)
LEVEL_CHOICE: List[str] = args.level
for lvl in LEVEL_CHOICE:
if lvl not in LEVELS:
error(f"Level {lvl} is not supported, can be only one of {LEVELS}")
sys.exit(1)
with open(args.token_file, 'r', encoding='utf-8') as f:
TOKEN: str = removesuffix(f.readline(), '\n')
HEADERS = {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': f"Token {TOKEN}",
'User-Agent': args.user_agent
}
HEADERS_NO_TOKEN = {
'Content-Type': 'application/json; charset=utf-8',
'User-Agent': args.user_agent
}
for header in args.additional_headers:
values = header.split(':', 1)
if len(values) < 2:
raise ValueError(f"Improper HTTP header specification: {header}")
HEADERS[values[0]] = values[1]
HEADERS_NO_TOKEN[values[0]] = values[1]
SKIP_TIMESTAMPS: bool = args.force_update_files
class Node:
"""Clas representing any node in whole bookstack documents "tree"."""
def __init__(self, name: str, parent: Union['Node', None], node_id: int,
last_edit_timestamp: datetime):
for char in FORBIDDEN_CHARS:
name = name.replace(char, "_")
self.__name: str = name
self.__children: List['Node'] = []
self.__parent: Union['Node', None] = parent
if parent is not None:
parent.add_child(self)
self.__last_edit_timestamp: datetime = last_edit_timestamp
self.__node_id = node_id
@property
def name(self) -> str:
"""Return name of this Shelf/Book/Chapter/Page."""
return self.__name
@property
def parent(self) -> Union['Node', None]:
"""Return parent Node or None if there isn't any."""
return self.__parent
def changed_since(self, timestamp: datetime) -> int:
"""
Check if remote version have changed after given timestamp,
including its children
:param timestamp:
:return: amount of changed documents at level of this document Node
"""
result: int = 0
if self.__last_edit_timestamp > timestamp:
result += 1
for child in self.__children:
result += child.changed_since(timestamp)
return result
def get_last_edit_timestamp(self) -> datetime:
return self.__last_edit_timestamp
def set_parent(self, parent: 'Node'):
self.__parent = parent
parent.add_child(self)
def add_child(self, child: 'Node'):
self.__children.append(child)
def get_path(self) -> str:
if self.__parent is None:
return "."
return self.__parent.get_path() + os.path.sep + self.__parent.name
def get_id(self) -> int:
return self.__node_id
shelves: Dict[int, Node] = {}
books: Dict[int, Node] = {}
chapters: Dict[int, Node] = {}
pages: Dict[int, Node] = {}
pages_not_in_chapter: Dict[int, Node] = {}
attachments: Dict[int, Node] = {}
def api_timestamp_string_to_datetime(timestamp: str) -> datetime:
return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
def make_dir(path: str):
path_obj = Path(path)
if path_obj.exists():
return
info(f"Creating dir {path}")
path_obj.mkdir(exist_ok=True, parents=True)
def api_get_bytes(path: str, **kwargs) -> bytes:
request_path: str = f'{API_PREFIX}/{path}'
if len(kwargs) > 0:
params: str = urllib.parse.urlencode(kwargs)
request_path += f"?{params}"
debug(f"Making http request: {request_path}")
request: Request = Request(request_path, headers=HEADERS)
with urlopen(request) as response:
if response.status == 403:
error("403 Forbidden, check your token!")
sys.exit(response.status)
return response.read()
def api_get_dict(path: str) -> dict:
"""Make api request at specified path and return result as dict."""
data = api_get_bytes(path).decode()
return json.loads(data)
def api_get_listing(path: str) -> list:
"""Retrieve whole lists through api.
Request for another 50 until have collected "total" amount.
:param path:
:return:
"""
count: int = 50
total: int = count
result: list = []
while total > len(result):
data: dict = json.loads(
api_get_bytes(path, count=count, offset=len(result)))
total = data['total']
result += data['data']
debug(f"API listing got {len(result)} items out of maximum {count}")
return result
def check_if_update_needed(file_path: str, document: Node) -> bool:
"""Check if a Node need updating on disk, according to timestamps."""
if SKIP_TIMESTAMPS:
return True
debug(f"Checking for update for file {file_path}")
if not os.path.exists(file_path):
debug(f"Document {file_path} is missing on disk, update needed.")
return True
local_last_edit: datetime = datetime.fromtimestamp(
os.path.getmtime(file_path))
remote_last_edit: datetime = document.get_last_edit_timestamp()
debug("Local file creation timestamp: "
f"{local_last_edit.date()} {local_last_edit.time()}, "
"remote edit timestamp: "
f"{remote_last_edit.date()} {remote_last_edit.time()}")
changes: int = document.changed_since(local_last_edit)
if changes > 0:
info(f"Document \"{file_path}\" consists of {changes} "
"outdated documents, update needed.")
return True
debug(f"Document \"{file_path}\" consists of {changes} "
"outdated documents, skipping updating.")
return False
def export_doc(documents: List[Node], level: str):
"""Save document-like Nodes to files."""
for document in documents:
make_dir(f"{FS_PATH}{os.path.sep}{document.get_path()}")
for v_format in formats:
path: str = f"{FS_PATH}{os.path.sep}{document.get_path()}" + \
f"{os.path.sep}{document.name}.{FORMATS[v_format]}"
if not check_if_update_needed(path, document):
continue
data: bytes = api_get_bytes(
f'{level}/{document.get_id()}/export/{v_format}')
with open(path, 'wb') as file:
info(f"Saving {path}")
file.write(data)
def export_attachments(attachments: List[Node]):
"""Save attachment Nodes to files."""
for attachment in attachments:
base_path = attachment.get_path()
if attachment.parent is None:
base_path = f'__ATTACHMENTS_FROM_DELETED_PAGES__{os.path.sep}{base_path}'
make_dir(f"{FS_PATH}{os.path.sep}{base_path}")
path: str = f"{FS_PATH}{os.path.sep}{base_path}" + \
f"{os.path.sep}{attachment.name}"
if not check_if_update_needed(path, attachment):
continue
data = api_get_bytes(f'attachments/{attachment.get_id()}')
data = json.loads(data)
content = data['content']
content_url = urllib.parse.urlparse(content)
if content_url.scheme:
if args.dont_export_external_attachments:
continue
info(f"Downloading attachment from url: {content_url.geturl()}")
request: Request = Request(content_url.geturl(),
headers=HEADERS_NO_TOKEN)
with urlopen(request) as response:
if response.status >= 300:
error(
"Could not download link-type attachment from "
f"'{content_url.geturl()}, got code {response.status}'!"
)
sys.exit(response.status)
with open(path, 'wb') as file:
info(f"Saving {path}")
file.write(response.read())
else:
with open(path, 'wb') as file:
info(f"Saving {path}")
file.write(base64.b64decode(content))
#########################
# Gathering data from api
#########################
info("Getting info about Shelves and their Books")
for shelf_data in api_get_listing('shelves'):
last_edit_ts: datetime = api_timestamp_string_to_datetime(
shelf_data['updated_at'])
shelf = Node(shelf_data.get('name'), None, shelf_data.get('id'),
last_edit_ts)
debug(f"Shelf: \"{shelf.name}\", ID: {shelf.get_id()}")
shelves[shelf.get_id()] = shelf
shelf_details = api_get_dict(f'shelves/{shelf.get_id()}')
if shelf_details.get('books') is None:
continue
for book_data in shelf_details['books']:
last_edit_ts: datetime = api_timestamp_string_to_datetime(
book_data['updated_at'])
book = Node(book_data.get('name'), shelf, book_data.get('id'),
last_edit_ts)
debug(f"Book: \"{book.name}\", ID: {book.get_id()}")
books[book.get_id()] = book
info("Getting info about Books not belonging to any shelf")
for book_data in api_get_listing('books'):
if book_data.get('id') in books:
continue
last_edit_ts: datetime = api_timestamp_string_to_datetime(
book_data['updated_at'])
book = Node(book_data.get('name'), None, book_data.get('id'), last_edit_ts)
debug(f"Book: \"{book.name}\", ID: {book.get_id()}, "
f"last edit: {book.get_last_edit_timestamp()}")
info(f"Book \"{book.name} has no shelf assigned.\"")
books[book.get_id()] = book
info("Getting info about Chapters")
for chapter_data in api_get_listing('chapters'):
last_edit_ts: datetime = api_timestamp_string_to_datetime(
chapter_data['updated_at'])
chapter = Node(chapter_data.get('name'),
books.get(chapter_data.get('book_id')),
chapter_data.get('id'), last_edit_ts)
debug(f"Chapter: \"{chapter.name}\", ID: {chapter.get_id()},"
f" last edit: {chapter.get_last_edit_timestamp()}")
chapters[chapter.get_id()] = chapter
info("Getting info about Pages")
for page_data in api_get_listing('pages'):
parent_id = page_data.get('chapter_id')
last_edit_ts: datetime = api_timestamp_string_to_datetime(
page_data['updated_at'])
if parent_id not in chapters:
parent = books[page_data['book_id']]
page = Node(page_data.get('name'), parent, page_data.get('id'),
last_edit_ts)
info(f"Page \"{page.name}\" is not in any chapter, "
f"using Book \"{parent.name}\" as a parent.")
debug(f"Page: \"{page.name}\", ID: {page.get_id()},"
f" last edit: {page.get_last_edit_timestamp()}")
pages[page.get_id()] = page
pages_not_in_chapter[page.get_id()] = page
continue
page = Node(page_data.get('name'), chapters.get(parent_id),
page_data.get('id'), last_edit_ts)
debug(f"Page: \"{page.name}\", ID: {page.get_id()}, "
f"last edit: {page.get_last_edit_timestamp()}")
pages[page.get_id()] = page
if not args.dont_export_attachments:
info("Getting info about Attachments.")
for attachment_data in api_get_listing('attachments'):
last_edit_ts: datetime = api_timestamp_string_to_datetime(
attachment_data['updated_at'])
all_pages = {}
all_pages.update(pages)
all_pages.update(pages_not_in_chapter)
attachment = Node(attachment_data.get('name'),
all_pages.get(attachment_data.get('uploaded_to')),
attachment_data.get('id'), last_edit_ts)
debug(f"Attachment: \"{attachment.name}\", ID: {attachment.get_id()},"
f" last edit: {attachment.get_last_edit_timestamp()}")
attachments[attachment.get_id()] = attachment
#########################
# Exporting data from api
#########################
files: List[Node] = []
EXPORT_PAGES_NOT_IN_CHAPTER: bool = False
for lvl in LEVEL_CHOICE:
if lvl == 'pages':
files = list(pages.values())
elif lvl == 'chapters':
files = list(chapters.values())
EXPORT_PAGES_NOT_IN_CHAPTER = True
elif lvl == 'books':
files = list(books.values())
export_doc(files, lvl)
if EXPORT_PAGES_NOT_IN_CHAPTER:
info("Exporting pages that are not in chapter...")
export_doc(list(pages_not_in_chapter.values()), 'pages')
if not args.dont_export_attachments:
export_attachments(list(attachments.values()))
info("Finished")
sys.exit(0)