#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import concurrent.futures
import json
import os
import re
import shutil
import time
from datetime import datetime
from html import escape
from urllib.parse import urljoin, urlparse
import requests
try:
from PIL import Image, ImageOps
except ImportError:
Image = None
ImageOps = None
USER_UPLOADS_RE = re.compile(r'(https?://[^\s"\'<>]+?/user_uploads/[^\s"\'<>]+|/user_uploads/[^\s"\'<>]+)')
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tif", ".tiff", ".webp"}
HTML_TEMPLATE = """
Zulip Archive
"""
def zulip_get_messages(session, base_url, narrow, anchor="newest", num_before=500, num_after=0):
params = {
"anchor": anchor,
"num_before": num_before,
"num_after": num_after,
"narrow": json.dumps(narrow),
"apply_markdown": "true"
}
url = urljoin(base_url, "/api/v1/messages")
r = session.get(url, params=params)
if r.status_code >= 400:
print("ERROR STATUS:", r.status_code)
print("ERROR BODY:", r.text)
r.raise_for_status()
return r.json()
def ensure_dir(path):
os.makedirs(path, exist_ok=True)
def sanitize_filename(name):
return re.sub(r'[^A-Za-z0-9._-]+', '_', name)
def sanitize_output_basename(name):
sanitized = re.sub(r'[^A-Za-z0-9]+', '_', name or '')
sanitized = re.sub(r'_+', '_', sanitized).strip('_')
return sanitized
def to_web_path(path):
return path.replace("\\", "/")
def is_image_upload_url(upload_url):
path = urlparse(upload_url).path
ext = os.path.splitext(path)[1].lower()
return ext in IMAGE_EXTENSIONS
def download_upload(session, base_url, upload_url, target_dir):
if upload_url.startswith("/"):
full_url = urljoin(base_url, upload_url)
else:
full_url = upload_url
parsed = urlparse(full_url)
filename = sanitize_filename(os.path.basename(parsed.path))
local_path = os.path.join(target_dir, filename)
if os.path.exists(local_path):
return local_path
try:
r = session.get(full_url, stream=True)
r.raise_for_status()
with open(local_path, "wb") as file_obj:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
file_obj.write(chunk)
return local_path
except requests.exceptions.HTTPError as err:
status = err.response.status_code if err.response is not None else "unknown"
print(f"WARN: failed to download upload ({status}): {full_url}")
return None
except requests.exceptions.RequestException as err:
print(f"WARN: network error downloading upload: {full_url} ({err})")
return None
def convert_to_webp(local_path, webp_dir, quality):
ext = os.path.splitext(local_path)[1].lower()
if ext not in IMAGE_EXTENSIONS:
return None
if Image is None:
return None
base_name = os.path.splitext(os.path.basename(local_path))[0]
webp_path = os.path.join(webp_dir, f"{base_name}.webp")
if os.path.exists(webp_path):
return webp_path
try:
with Image.open(local_path) as image:
image = ImageOps.exif_transpose(image)
if image.mode in ("RGBA", "LA", "P"):
image = image.convert("RGBA")
else:
image = image.convert("RGB")
image.save(webp_path, "WEBP", quality=quality, method=6)
return webp_path
except Exception as err: # pylint: disable=broad-except
print(f"WARN: could not convert to WEBP {local_path}: {err}")
return None
def get_stream_id(session, base_url, stream_name):
url = urljoin(base_url, "/api/v1/get_stream_id")
response = session.get(url, params={"stream": stream_name})
if response.status_code >= 400:
print("WARN: could not resolve stream_id for topic verification")
return None
data = response.json()
return data.get("stream_id")
def get_stream_topics(session, base_url, stream_id):
url = urljoin(base_url, f"/api/v1/users/me/{stream_id}/topics")
response = session.get(url)
if response.status_code >= 400:
print("WARN: could not fetch stream topics for verification")
return set()
data = response.json()
return {topic.get("name", "") for topic in data.get("topics", []) if topic.get("name")}
def transform_message_content(session, base_url, content, uploads_dir, images_original_dir, out_dir):
uploads = set(USER_UPLOADS_RE.findall(content))
replacements = {}
message_local_paths = set()
for upload_url in uploads:
target_dir = images_original_dir if is_image_upload_url(upload_url) else uploads_dir
local_path = download_upload(session, base_url, upload_url, target_dir)
if not local_path:
continue
rel_path = to_web_path(os.path.relpath(local_path, out_dir))
replacements[upload_url] = rel_path
message_local_paths.add(rel_path)
transformed = content
for original_url, rel_path in sorted(replacements.items(), key=lambda item: len(item[0]), reverse=True):
transformed = transformed.replace(original_url, rel_path)
text_for_search = re.sub('<[^<]+?>', ' ', transformed)
text_for_search = " ".join(text_for_search.split())
return transformed, text_for_search, message_local_paths
def prepare_messages_json(messages, session, base_url, uploads_dir, images_original_dir, out_dir):
js_messages = []
all_local_paths = set()
for message in messages:
sender = message["sender_full_name"]
topic = message["subject"] or "(no topic)"
timestamp = datetime.fromtimestamp(message["timestamp"]).strftime("%Y-%m-%d %H:%M")
raw_content = message["content"]
transformed_content, search_text, message_local_paths = transform_message_content(
session=session,
base_url=base_url,
content=raw_content,
uploads_dir=uploads_dir,
images_original_dir=images_original_dir,
out_dir=out_dir,
)
all_local_paths.update(message_local_paths)
combined_search = f"{sender} {topic} {search_text}"
js_messages.append({
"id": message["id"],
"sender": escape(sender),
"topic": escape(topic),
"date": timestamp,
"day": timestamp[:10],
"content": transformed_content,
"search": combined_search,
"_local_paths": sorted(message_local_paths),
})
return js_messages, all_local_paths
def convert_uploads_to_webp_parallel(local_rel_paths, out_dir, webp_dir, quality, workers):
if Image is None:
return {}
rel_paths = sorted(local_rel_paths)
if not rel_paths:
return {}
webp_map = {}
def convert_one(rel_path):
local_path = os.path.join(out_dir, rel_path.replace("/", os.sep))
webp_path = convert_to_webp(local_path, webp_dir, quality)
if not webp_path:
return rel_path, None
webp_rel_path = to_web_path(os.path.relpath(webp_path, out_dir))
return rel_path, webp_rel_path
max_workers = max(1, workers)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {executor.submit(convert_one, rel_path): rel_path for rel_path in rel_paths}
for future in concurrent.futures.as_completed(future_map):
rel_path, webp_rel_path = future.result()
if webp_rel_path:
webp_map[rel_path] = webp_rel_path
return webp_map
def apply_webp_replacements(js_messages, webp_map):
for message in js_messages:
content = message["content"]
local_paths = message.pop("_local_paths", [])
for original_rel_path in local_paths:
webp_rel_path = webp_map.get(original_rel_path)
if not webp_rel_path:
continue
content = content.replace(original_rel_path, webp_rel_path)
message["content"] = content
return js_messages
def build_html(meta, messages_json_filename, render_chunk_size):
return build_html_with_messages(meta, messages_json_filename, render_chunk_size, None)
def build_html_with_messages(meta, messages_json_filename, render_chunk_size, messages_payload):
embedded_json = "null"
if messages_payload is not None:
embedded_json = json.dumps({"messages": messages_payload}, ensure_ascii=False).replace("", "<\\/")
return (
HTML_TEMPLATE
.replace("__META__", escape(meta))
.replace("__MESSAGES_JSON__", messages_json_filename)
.replace("__RENDER_CHUNK_SIZE__", str(max(1, render_chunk_size)))
.replace("__EMBEDDED_MESSAGES__", embedded_json)
)
def verify_topics_if_needed(session, base_url, stream_name, exported_messages, topic_filter):
if topic_filter:
return
stream_id = get_stream_id(session, base_url, stream_name)
if stream_id is None:
return
api_topics = get_stream_topics(session, base_url, stream_id)
exported_topics = {msg.get("subject", "") for msg in exported_messages if msg.get("subject")}
missing = sorted(topic for topic in api_topics if topic not in exported_topics)
print(f"INFO: topics detected in API: {len(api_topics)}")
print(f"INFO: topics present in export: {len(exported_topics)}")
if missing:
print(f"WARN: missing {len(missing)} topics in export (sample): {missing[:10]}")
else:
print("OK: export includes all detectable stream topics.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Export Zulip messages to HTML with optimized attachments.")
parser.add_argument("--base-url", required=True, help="Zulip base URL, e.g. https://zulip.yourcompany.com")
parser.add_argument("--email", required=True, help="Your Zulip email")
parser.add_argument("--api-key", required=True, help="Your Zulip API key")
parser.add_argument("--stream", required=True, help="Stream name")
parser.add_argument("--topic", default=None, help="Topic name (optional)")
parser.add_argument("--out", default="zulip_export", help="Output directory")
parser.add_argument("--chunk-size", type=int, default=120, help="Messages per JS render chunk")
parser.add_argument("--embed-html", dest="embed_html", action="store_true", default=True, help="Embed messages in generated HTML (default: enabled)")
parser.add_argument("--no-embed-html", dest="embed_html", action="store_false", help="Do not embed messages; load from generated JSON")
parser.add_argument("--webp-quality", type=int, default=72, help="WEBP quality (1-100)")
parser.add_argument("--webp-workers", type=int, default=max(2, (os.cpu_count() or 4)), help="Worker threads for WEBP conversion")
parser.add_argument("--delete-original-images", action="store_true", help="Delete uploads_originalimages after successful WEBP conversion")
args = parser.parse_args()
base_url = args.base_url.rstrip("/")
out_dir = args.out
uploads_dir = os.path.join(out_dir, "uploads")
images_original_dir = os.path.join(out_dir, "uploads_originalimages")
webp_dir = os.path.join(out_dir, "uploads_webp")
ensure_dir(out_dir)
ensure_dir(uploads_dir)
ensure_dir(images_original_dir)
ensure_dir(webp_dir)
if Image is None:
print("WARN: Pillow is not installed. Original images will be used without WEBP conversion.")
session = requests.Session()
session.auth = (args.email, args.api_key)
narrow = [{"operator": "stream", "operand": args.stream}]
if args.topic:
narrow.append({"operator": "topic", "operand": args.topic})
all_messages = []
anchor = "newest"
while True:
data = zulip_get_messages(session, base_url, narrow, anchor=anchor, num_before=1000, num_after=0)
messages = data.get("messages", [])
if not messages:
break
all_messages.extend(messages)
oldest_id = messages[0]["id"]
anchor = oldest_id - 1
if len(messages) < 1000:
break
time.sleep(0.2)
all_messages = sorted(all_messages, key=lambda msg: msg["timestamp"])
verify_topics_if_needed(
session=session,
base_url=base_url,
stream_name=args.stream,
exported_messages=all_messages,
topic_filter=args.topic,
)
meta = (
f"Stream: {args.stream}"
+ (f" · Topic: {args.topic}" if args.topic else " · All topics")
+ f" · Total: {len(all_messages)}"
)
webp_quality = max(1, min(100, args.webp_quality))
messages_payload, local_paths = prepare_messages_json(
messages=all_messages,
session=session,
base_url=base_url,
uploads_dir=uploads_dir,
images_original_dir=images_original_dir,
out_dir=out_dir,
)
webp_map = convert_uploads_to_webp_parallel(
local_rel_paths=local_paths,
out_dir=out_dir,
webp_dir=webp_dir,
quality=webp_quality,
workers=args.webp_workers,
)
messages_payload = apply_webp_replacements(messages_payload, webp_map)
if Image is not None:
print(f"INFO: WEBP conversions available: {len(webp_map)}")
if args.delete_original_images:
original_image_paths = {path for path in local_paths if path.startswith("uploads_originalimages/")}
failed_image_paths = original_image_paths - set(webp_map.keys())
if Image is None:
print("WARN: --delete-original-images skipped because Pillow is not installed.")
elif failed_image_paths:
print(f"WARN: --delete-original-images skipped. {len(failed_image_paths)} image(s) still use originals.")
elif os.path.isdir(images_original_dir):
shutil.rmtree(images_original_dir, ignore_errors=True)
print(f"OK: deleted {images_original_dir}")
stream_base = sanitize_output_basename(args.stream) or "zulip_export"
if args.topic:
topic_base = sanitize_output_basename(args.topic)
if topic_base:
stream_base = f"{stream_base}_{topic_base}"
messages_json_filename = f"{stream_base}.json"
html_filename = f"{stream_base}.html"
messages_json_path = os.path.join(out_dir, messages_json_filename)
with open(messages_json_path, "w", encoding="utf-8") as file_obj:
json.dump({"messages": messages_payload}, file_obj, ensure_ascii=False)
html = build_html_with_messages(
meta=meta,
messages_json_filename=messages_json_filename,
render_chunk_size=args.chunk_size,
messages_payload=messages_payload if args.embed_html else None,
)
html_path = os.path.join(out_dir, html_filename)
with open(html_path, "w", encoding="utf-8") as file_obj:
file_obj.write(html)
print(f"OK: {html_path}")
print(f"OK: {messages_json_path}")