Advanced use cases
This chapter covers patterns that go beyond the basic read-transform-write
workflow documented in How-to guides. Each section focuses on something the
how-to guides either don’t cover at all or only mention in passing:
deferring I/O with LazyAsset, distributing work across
processes with Celery, content-addressed deduplication, custom streaming
backends, real-time progress reporting, fine-tuning encoder settings, and
writing your own Processor.
Deferred loading with LazyAsset
Problem
A standard Asset holds the raw essence bytes in memory.
For a 500 MB video file that means a 500 MB payload every time the asset
passes through a message broker (Celery, RQ, …), gets pickled into a cache,
or is stored in a database column. The payload usually exceeds broker limits
and is slow regardless.
Solution
LazyAsset stores only a URI string and the metadata
dict. The raw bytes are fetched on demand the first time .essence is
accessed by calling a loader function you provide.
from madam.core import LazyAsset
def s3_loader(uri: str):
"""Return a readable BytesIO stream for an s3:// URI."""
import io
import boto3
bucket, key = uri[5:].split('/', 1) # strip 's3://'
buf = io.BytesIO()
boto3.client('s3').download_fileobj(bucket, key, buf)
buf.seek(0)
return buf
asset = LazyAsset(
uri='s3://my-bucket/uploads/video.mp4',
loader=s3_loader,
mime_type='video/mp4',
duration=120.0,
width=1920,
height=1080,
)
The loader is never serialised. Calling pickle.dumps(asset) produces
a small payload that contains only the URI and the metadata mapping:
import pickle
data = pickle.dumps(asset) # a few hundred bytes, not 500 MB
restored = pickle.loads(data)
print(restored.uri) # 's3://my-bucket/uploads/video.mp4'
print(restored.mime_type) # 'video/mp4'
After unpickling the loader is None. Re-attach a loader before
accessing the essence:
restored._loader = s3_loader # re-attach in the worker process
frame = restored.essence.read(1024) # fetches from S3 now
A LazyAsset is a drop-in for a regular Asset: processors, operators, and pipelines accept it wherever they accept Asset:
from madam import Madam
madam = Madam()
processor = madam.get_processor(asset) # O(1) — uses asset.mime_type
thumbnail = processor.resize(width=320, height=240)(asset)
Note
Because every call to .essence invokes the loader, cache the result
locally if you need to read the stream more than once:
import io
data = io.BytesIO(asset.essence.read()) # one round-trip
# use data as many times as you like
Distributed processing with Celery
The combination of LazyAsset’s small pickle payload and
MADAM’s pure-function operators makes Celery integration straightforward.
Basic task
# tasks.py
import io
import boto3
from celery import Celery
from madam import Madam
from madam.core import LazyAsset, OperatorError
app = Celery('tasks', broker='redis://localhost:6379/0')
madam = Madam({'image/jpeg': {'quality': 80}})
def s3_loader(uri: str):
bucket, key = uri[5:].split('/', 1)
buf = io.BytesIO()
boto3.client('s3').download_fileobj(bucket, key, buf)
buf.seek(0)
return buf
def s3_upload(asset, bucket: str, key: str) -> str:
boto3.client('s3').upload_fileobj(asset.essence, bucket, key)
return f's3://{bucket}/{key}'
@app.task(bind=True, max_retries=3)
def resize_and_store(self, uri: str, width: int, height: int):
# Re-attach loader in the worker (loader is never pickled)
asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='image/jpeg')
try:
processor = madam.get_processor(asset)
result = processor.resize(width=width, height=height)(asset)
out_key = f'thumbnails/{width}x{height}/{uri.split("/")[-1]}'
return s3_upload(result, 'my-bucket', out_key)
except OperatorError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Calling the task:
# In your web handler or upload handler:
resize_and_store.delay('s3://my-bucket/uploads/photo.jpg', 320, 240)
Fan-out: generating multiple renditions in parallel
Submit one task per rendition instead of one large sequential task. All workers read the same source lazily from S3, so there is no wasted bandwidth for renditions that fail:
from celery import group
RENDITIONS = [
{'width': 1920, 'height': 1080, 'suffix': '1080p'},
{'width': 1280, 'height': 720, 'suffix': '720p'},
{'width': 640, 'height': 360, 'suffix': '360p'},
{'width': 320, 'height': 240, 'suffix': 'thumb'},
]
def enqueue_renditions(source_uri: str):
job = group(
resize_and_store.s(source_uri, r['width'], r['height'])
for r in RENDITIONS
)
return job.apply_async()
Chaining tasks with a Pipeline
For multi-step processing, build the pipeline once and share it across tasks:
from madam.core import Pipeline
from madam.image import ResizeMode
# Build once at module level — operators are plain callables, safe to share
_pipeline = Pipeline()
_pipeline.add(madam.get_processor('image/jpeg').resize(
width=1280, height=720, mode=ResizeMode.FILL))
_pipeline.add(madam.get_processor('image/jpeg').convert(mime_type='image/webp'))
@app.task
def transcode_to_webp(uri: str):
asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='image/jpeg')
results = list(_pipeline.process(asset))
return s3_upload(results[0], 'my-bucket', f'webp/{uri.split("/")[-1]}')
Tip
Use TransientOperatorError to drive Celery retries and
PermanentOperatorError for dead-letter routing:
from madam.core import (
TransientOperatorError,
PermanentOperatorError,
UnsupportedFormatError,
)
@app.task(bind=True, max_retries=5)
def safe_transcode(self, uri: str):
asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='video/mp4')
try:
processor = madam.get_processor(asset)
result = processor.convert(mime_type='video/webm')(asset)
return s3_upload(result, 'my-bucket', uri.split('/')[-1])
except UnsupportedFormatError:
return None # skip silently
except PermanentOperatorError:
send_to_dead_letter(uri)
return None
except TransientOperatorError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Content-addressed storage and deduplication
Every Asset exposes a content_id
property that is the hex-encoded SHA-256 digest of the raw essence bytes. Use
it as a storage key to deduplicate identical files automatically:
from madam.core import FileSystemAssetStorage
storage = FileSystemAssetStorage('/data/assets')
for path in incoming_files:
with open(path, 'rb') as f:
asset = madam.read(f)
# Identical content → same key → no duplicate entry
storage[asset.content_id] = (asset, {'raw'})
Retrieve by content ID or check existence cheaply:
cid = '3a7bd3e2...'
if cid in storage:
asset, tags = storage[cid]
For LazyAsset, content_id is computed on first
access and cached; subsequent calls are O(1):
lazy = LazyAsset(uri='s3://bucket/image.jpg', loader=s3_loader,
mime_type='image/jpeg')
cid = lazy.content_id # fetches once, hashes, caches
cid2 = lazy.content_id # returns cached value — no fetch
Custom streaming output backends
The how-to guide shows how to write HLS/DASH segments to a local directory
with DirectoryOutput. This section covers the two
other built-in backends and how to implement your own.
Writing to a zip archive (in-memory)
Useful for testing, for download APIs that stream the archive, or for uploading all segments to object storage as a single atomic batch:
import io
import zipfile
from madam.streaming import ZipOutput
buf = io.BytesIO()
with ZipOutput(buf) as output:
proc.to_hls(asset, output=output, segment_duration=6)
# buf now holds a zip with the manifest and all segments
buf.seek(0)
with zipfile.ZipFile(buf) as zf:
print(zf.namelist()) # ['index.m3u8', 'segment_000.ts', …]
Writing to object storage
Implement MultiFileOutput to write directly to any
backend. The interface requires only a single write method:
import boto3
from madam.streaming import MultiFileOutput
class S3Output(MultiFileOutput):
def __init__(self, bucket: str, prefix: str):
self._bucket = bucket
self._prefix = prefix
self._client = boto3.client('s3')
def write(self, relative_path: str, data: bytes) -> None:
key = f'{self._prefix}/{relative_path}'
self._client.put_object(Bucket=self._bucket, Key=key, Body=data)
output = S3Output('my-bucket', 'hls/video1')
proc.to_hls(asset, output=output, segment_duration=4)
MPEG-DASH works the same way — replace to_hls with to_dash:
proc.to_dash(asset, output=S3Output('my-bucket', 'dash/video1'),
segment_duration=4)
Both methods accept optional video and audio dicts that mirror the
convert() operator options:
from madam.video import VideoCodec
from madam.audio import AudioCodec
proc.to_hls(
asset,
output=output,
segment_duration=6,
video={'codec': VideoCodec.H264, 'bitrate': 2000},
audio={'codec': AudioCodec.AAC, 'bitrate': 128},
)
Real-time transcoding progress
Pass a progress_callback to convert()
to receive periodic progress updates during long transcoding jobs. The
callback receives a dict with keys like frame, fps,
out_time, and bitrate:
from madam import Madam
madam = Madam()
with open('video.mp4', 'rb') as f:
asset = madam.read(f)
proc = madam.get_processor(asset)
def on_progress(info: dict) -> None:
frame = info.get('frame', '?')
out_time = info.get('out_time', '?')
print(f'frame={frame} time={out_time}')
result = proc.convert(
mime_type='video/webm',
progress_callback=on_progress,
)(asset)
When no progress_callback is supplied, MADAM uses subprocess.run
(simpler, lower overhead). When a callback is supplied it switches to
subprocess.Popen and reads the progress stream line by line.
Celery + progress with a result backend
Combine the callback with Celery’s update_state to expose live progress
through the result backend:
@app.task(bind=True)
def transcode_video(self, uri: str, target_mime: str):
asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='video/mp4')
proc = madam.get_processor(asset)
def on_progress(info: dict) -> None:
self.update_state(
state='PROGRESS',
meta={'out_time': info.get('out_time', ''), 'frame': info.get('frame', '')},
)
result = proc.convert(
mime_type=target_mime,
progress_callback=on_progress,
)(asset)
return s3_upload(result, 'my-bucket', uri.split('/')[-1])
Fine-tuning encoder settings
Pass a configuration mapping to Madam to control
encoder defaults. Keys are MIME type strings (or short names); values are
dicts of options:
from madam import Madam
madam = Madam({
'image/jpeg': {'quality': 85, 'progressive': True},
'image/png': {'optimize': True, 'zopfli': True},
'image/webp': {'quality': 80, 'method': 6},
'image/avif': {'quality': 70, 'speed': 4},
'image/gif': {'optimize': True},
'image/tiff': {'compression': 'tiff_lzw'},
'ffmpeg': {'threads': 8},
'video': {'keyframe_interval': 60},
})
Image options
MIME type |
Key |
Values / meaning |
|---|---|---|
|
|
Integer 1–95 (Pillow default 75) |
|
|
|
|
|
|
|
|
Enable Zopfli compression (requires |
|
|
Integer 1–100 |
|
|
0 (fastest) – 6 (best compression) |
|
|
Integer 0–100 (higher = better quality, larger file) |
|
|
0 (slowest/best) – 10 (fastest/worst) |
|
|
|
|
|
|
Video / FFmpeg options
Key |
Effect |
|---|---|
|
Number of FFmpeg threads (default: CPU count) |
|
Keyframe interval in frames (default 100) |
|
Disable |
Per-codec CRF overrides are set with a 'codec/<libname>' key:
madam = Madam({
'codec/libx264': {'crf': 23}, # H.264 (default 23)
'codec/libvpx-vp9': {'crf': 33}, # VP9
'codec/libx265': {'crf': 28}, # H.265
})
Note
The Madam() constructor does not validate configuration keys.
Unrecognised keys under image MIME types emit a UserWarning;
FFmpeg keys are silently ignored.
Writing a custom Processor
To add support for a new file format, subclass Processor
and override the two abstract methods plus the supported_mime_types
property. Register the new processor with Madam so that
get_processor() finds it automatically.
Minimal example
import io
from madam.core import Asset, Processor, UnsupportedFormatError, operator
class NetpbmProcessor(Processor):
"""Read/write Netpbm PBM/PGM/PPM files."""
MAGIC = {b'P1': 'image/x-portable-bitmap',
b'P2': 'image/x-portable-graymap',
b'P3': 'image/x-portable-pixmap',
b'P4': 'image/x-portable-bitmap',
b'P5': 'image/x-portable-graymap',
b'P6': 'image/x-portable-pixmap'}
@property
def supported_mime_types(self) -> frozenset:
return frozenset(self.MAGIC.values())
def can_read(self, file: io.IOBase) -> bool:
magic = file.read(2)
file.seek(0)
return magic in self.MAGIC
def read(self, file: io.IOBase) -> Asset:
data = file.read()
magic = data[:2]
if magic not in self.MAGIC:
raise UnsupportedFormatError('Not a Netpbm file')
mime_type = self.MAGIC[magic]
# Parse dimensions from the header (simplified)
lines = data.split(b'\n')
w, h = map(int, lines[1].split())
return Asset._from_bytes(data, mime_type=mime_type, width=w, height=h)
Adding operators with @operator
Use the operator() decorator to create lazy, reusable
operators. Calling the method returns a configured callable
(Asset → Asset), not the result directly:
from madam.core import operator
class NetpbmProcessor(Processor):
...
@operator
def invert(self, asset: Asset) -> Asset:
"""Return a new asset with all pixel values inverted."""
import PIL.Image
img = PIL.Image.open(asset.essence).convert('RGB')
inverted = PIL.Image.eval(img, lambda v: 255 - v)
buf = io.BytesIO()
inverted.save(buf, format='PPM')
buf.seek(0)
return Asset(essence=buf, mime_type=asset.mime_type,
width=asset.width, height=asset.height)
proc = NetpbmProcessor()
invert_op = proc.invert() # returns a callable (not the result)
result = invert_op(ppm_asset) # apply to an asset
Operators returned by @operator can be stored in a
Pipeline and applied in sequence:
from madam.core import Pipeline
pipeline = Pipeline()
pipeline.add(proc.invert())
pipeline.add(proc.invert()) # double invert → original
(result,) = pipeline.process(ppm_asset)
Registering with Madam
from madam import Madam
madam = Madam()
madam.processors.append(NetpbmProcessor())
# madam.get_processor('image/x-portable-pixmap') now returns your processor
Note
Madam builds its internal MIME-type index at __init__ time. If you
register a processor after construction, call
madam._rebuild_mime_index() (or simply construct a fresh Madam()
with the processor already appended to the processor list) to keep the
O(1) lookup accurate.