Advanced use cases

This chapter covers patterns that go beyond the basic read-transform-write workflow documented in How-to guides. Each section focuses on something the how-to guides either don’t cover at all or only mention in passing: deferring I/O with LazyAsset, distributing work across processes with Celery, content-addressed deduplication, custom streaming backends, real-time progress reporting, fine-tuning encoder settings, and writing your own Processor.

Deferred loading with LazyAsset

Problem

A standard Asset holds the raw essence bytes in memory. For a 500 MB video file that means a 500 MB payload every time the asset passes through a message broker (Celery, RQ, …), gets pickled into a cache, or is stored in a database column. The payload usually exceeds broker limits and is slow regardless.

Solution

LazyAsset stores only a URI string and the metadata dict. The raw bytes are fetched on demand the first time .essence is accessed by calling a loader function you provide.

from madam.core import LazyAsset

def s3_loader(uri: str):
    """Return a readable BytesIO stream for an s3:// URI."""
    import io
    import boto3
    bucket, key = uri[5:].split('/', 1)  # strip 's3://'
    buf = io.BytesIO()
    boto3.client('s3').download_fileobj(bucket, key, buf)
    buf.seek(0)
    return buf

asset = LazyAsset(
    uri='s3://my-bucket/uploads/video.mp4',
    loader=s3_loader,
    mime_type='video/mp4',
    duration=120.0,
    width=1920,
    height=1080,
)

The loader is never serialised. Calling pickle.dumps(asset) produces a small payload that contains only the URI and the metadata mapping:

import pickle

data = pickle.dumps(asset)          # a few hundred bytes, not 500 MB
restored = pickle.loads(data)
print(restored.uri)                 # 's3://my-bucket/uploads/video.mp4'
print(restored.mime_type)           # 'video/mp4'

After unpickling the loader is None. Re-attach a loader before accessing the essence:

restored._loader = s3_loader        # re-attach in the worker process
frame = restored.essence.read(1024) # fetches from S3 now

A LazyAsset is a drop-in for a regular Asset: processors, operators, and pipelines accept it wherever they accept Asset:

from madam import Madam

madam = Madam()
processor = madam.get_processor(asset)   # O(1) — uses asset.mime_type
thumbnail = processor.resize(width=320, height=240)(asset)

Note

Because every call to .essence invokes the loader, cache the result locally if you need to read the stream more than once:

import io
data = io.BytesIO(asset.essence.read())   # one round-trip
# use data as many times as you like

Distributed processing with Celery

The combination of LazyAsset’s small pickle payload and MADAM’s pure-function operators makes Celery integration straightforward.

Basic task

# tasks.py
import io
import boto3
from celery import Celery
from madam import Madam
from madam.core import LazyAsset, OperatorError

app = Celery('tasks', broker='redis://localhost:6379/0')
madam = Madam({'image/jpeg': {'quality': 80}})

def s3_loader(uri: str):
    bucket, key = uri[5:].split('/', 1)
    buf = io.BytesIO()
    boto3.client('s3').download_fileobj(bucket, key, buf)
    buf.seek(0)
    return buf

def s3_upload(asset, bucket: str, key: str) -> str:
    boto3.client('s3').upload_fileobj(asset.essence, bucket, key)
    return f's3://{bucket}/{key}'

@app.task(bind=True, max_retries=3)
def resize_and_store(self, uri: str, width: int, height: int):
    # Re-attach loader in the worker (loader is never pickled)
    asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='image/jpeg')

    try:
        processor = madam.get_processor(asset)
        result = processor.resize(width=width, height=height)(asset)
        out_key = f'thumbnails/{width}x{height}/{uri.split("/")[-1]}'
        return s3_upload(result, 'my-bucket', out_key)
    except OperatorError as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Calling the task:

# In your web handler or upload handler:
resize_and_store.delay('s3://my-bucket/uploads/photo.jpg', 320, 240)

Fan-out: generating multiple renditions in parallel

Submit one task per rendition instead of one large sequential task. All workers read the same source lazily from S3, so there is no wasted bandwidth for renditions that fail:

from celery import group

RENDITIONS = [
    {'width': 1920, 'height': 1080, 'suffix': '1080p'},
    {'width': 1280, 'height': 720,  'suffix': '720p'},
    {'width': 640,  'height': 360,  'suffix': '360p'},
    {'width': 320,  'height': 240,  'suffix': 'thumb'},
]

def enqueue_renditions(source_uri: str):
    job = group(
        resize_and_store.s(source_uri, r['width'], r['height'])
        for r in RENDITIONS
    )
    return job.apply_async()

Chaining tasks with a Pipeline

For multi-step processing, build the pipeline once and share it across tasks:

from madam.core import Pipeline
from madam.image import ResizeMode

# Build once at module level — operators are plain callables, safe to share
_pipeline = Pipeline()
_pipeline.add(madam.get_processor('image/jpeg').resize(
    width=1280, height=720, mode=ResizeMode.FILL))
_pipeline.add(madam.get_processor('image/jpeg').convert(mime_type='image/webp'))

@app.task
def transcode_to_webp(uri: str):
    asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='image/jpeg')
    results = list(_pipeline.process(asset))
    return s3_upload(results[0], 'my-bucket', f'webp/{uri.split("/")[-1]}')

Tip

Use TransientOperatorError to drive Celery retries and PermanentOperatorError for dead-letter routing:

from madam.core import (
    TransientOperatorError,
    PermanentOperatorError,
    UnsupportedFormatError,
)

@app.task(bind=True, max_retries=5)
def safe_transcode(self, uri: str):
    asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='video/mp4')
    try:
        processor = madam.get_processor(asset)
        result = processor.convert(mime_type='video/webm')(asset)
        return s3_upload(result, 'my-bucket', uri.split('/')[-1])
    except UnsupportedFormatError:
        return None           # skip silently
    except PermanentOperatorError:
        send_to_dead_letter(uri)
        return None
    except TransientOperatorError as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Content-addressed storage and deduplication

Every Asset exposes a content_id property that is the hex-encoded SHA-256 digest of the raw essence bytes. Use it as a storage key to deduplicate identical files automatically:

from madam.core import FileSystemAssetStorage

storage = FileSystemAssetStorage('/data/assets')

for path in incoming_files:
    with open(path, 'rb') as f:
        asset = madam.read(f)
    # Identical content → same key → no duplicate entry
    storage[asset.content_id] = (asset, {'raw'})

Retrieve by content ID or check existence cheaply:

cid = '3a7bd3e2...'
if cid in storage:
    asset, tags = storage[cid]

For LazyAsset, content_id is computed on first access and cached; subsequent calls are O(1):

lazy = LazyAsset(uri='s3://bucket/image.jpg', loader=s3_loader,
                 mime_type='image/jpeg')
cid = lazy.content_id   # fetches once, hashes, caches
cid2 = lazy.content_id  # returns cached value — no fetch

Custom streaming output backends

The how-to guide shows how to write HLS/DASH segments to a local directory with DirectoryOutput. This section covers the two other built-in backends and how to implement your own.

Writing to a zip archive (in-memory)

Useful for testing, for download APIs that stream the archive, or for uploading all segments to object storage as a single atomic batch:

import io
import zipfile
from madam.streaming import ZipOutput

buf = io.BytesIO()
with ZipOutput(buf) as output:
    proc.to_hls(asset, output=output, segment_duration=6)

# buf now holds a zip with the manifest and all segments
buf.seek(0)
with zipfile.ZipFile(buf) as zf:
    print(zf.namelist())   # ['index.m3u8', 'segment_000.ts', …]

Writing to object storage

Implement MultiFileOutput to write directly to any backend. The interface requires only a single write method:

import boto3
from madam.streaming import MultiFileOutput

class S3Output(MultiFileOutput):
    def __init__(self, bucket: str, prefix: str):
        self._bucket = bucket
        self._prefix = prefix
        self._client = boto3.client('s3')

    def write(self, relative_path: str, data: bytes) -> None:
        key = f'{self._prefix}/{relative_path}'
        self._client.put_object(Bucket=self._bucket, Key=key, Body=data)

output = S3Output('my-bucket', 'hls/video1')
proc.to_hls(asset, output=output, segment_duration=4)

MPEG-DASH works the same way — replace to_hls with to_dash:

proc.to_dash(asset, output=S3Output('my-bucket', 'dash/video1'),
             segment_duration=4)

Both methods accept optional video and audio dicts that mirror the convert() operator options:

from madam.video import VideoCodec
from madam.audio import AudioCodec

proc.to_hls(
    asset,
    output=output,
    segment_duration=6,
    video={'codec': VideoCodec.H264, 'bitrate': 2000},
    audio={'codec': AudioCodec.AAC,  'bitrate': 128},
)

Real-time transcoding progress

Pass a progress_callback to convert() to receive periodic progress updates during long transcoding jobs. The callback receives a dict with keys like frame, fps, out_time, and bitrate:

from madam import Madam

madam = Madam()
with open('video.mp4', 'rb') as f:
    asset = madam.read(f)

proc = madam.get_processor(asset)

def on_progress(info: dict) -> None:
    frame = info.get('frame', '?')
    out_time = info.get('out_time', '?')
    print(f'frame={frame}  time={out_time}')

result = proc.convert(
    mime_type='video/webm',
    progress_callback=on_progress,
)(asset)

When no progress_callback is supplied, MADAM uses subprocess.run (simpler, lower overhead). When a callback is supplied it switches to subprocess.Popen and reads the progress stream line by line.

Celery + progress with a result backend

Combine the callback with Celery’s update_state to expose live progress through the result backend:

@app.task(bind=True)
def transcode_video(self, uri: str, target_mime: str):
    asset = LazyAsset(uri=uri, loader=s3_loader, mime_type='video/mp4')
    proc = madam.get_processor(asset)

    def on_progress(info: dict) -> None:
        self.update_state(
            state='PROGRESS',
            meta={'out_time': info.get('out_time', ''), 'frame': info.get('frame', '')},
        )

    result = proc.convert(
        mime_type=target_mime,
        progress_callback=on_progress,
    )(asset)
    return s3_upload(result, 'my-bucket', uri.split('/')[-1])

Fine-tuning encoder settings

Pass a configuration mapping to Madam to control encoder defaults. Keys are MIME type strings (or short names); values are dicts of options:

from madam import Madam

madam = Madam({
    'image/jpeg': {'quality': 85, 'progressive': True},
    'image/png':  {'optimize': True, 'zopfli': True},
    'image/webp': {'quality': 80, 'method': 6},
    'image/avif': {'quality': 70, 'speed': 4},
    'image/gif':  {'optimize': True},
    'image/tiff': {'compression': 'tiff_lzw'},
    'ffmpeg':     {'threads': 8},
    'video':      {'keyframe_interval': 60},
})

Image options

MIME type

Key

Values / meaning

image/jpeg

quality

Integer 1–95 (Pillow default 75)

image/jpeg

progressive

True for progressive JPEG encoding

image/png

optimize

True to enable Pillow’s optimizer pass

image/png

zopfli / zopfli_strategies

Enable Zopfli compression (requires madam[optimize])

image/webp

quality

Integer 1–100

image/webp

method

0 (fastest) – 6 (best compression)

image/avif

quality

Integer 0–100 (higher = better quality, larger file)

image/avif

speed

0 (slowest/best) – 10 (fastest/worst)

image/tiff

compression

'tiff_lzw', 'tiff_deflate', 'jpeg', 'raw'

image/gif

optimize

True to merge duplicate palette entries

Video / FFmpeg options

Key

Effect

'ffmpeg': {'threads': N}

Number of FFmpeg threads (default: CPU count)

'video': {'keyframe_interval': N}

Keyframe interval in frames (default 100)

'video/quicktime': {'faststart': False}

Disable -movflags +faststart (enabled by default for MP4/QuickTime)

Per-codec CRF overrides are set with a 'codec/<libname>' key:

madam = Madam({
    'codec/libx264':    {'crf': 23},   # H.264 (default 23)
    'codec/libvpx-vp9': {'crf': 33},   # VP9
    'codec/libx265':    {'crf': 28},   # H.265
})

Note

The Madam() constructor does not validate configuration keys. Unrecognised keys under image MIME types emit a UserWarning; FFmpeg keys are silently ignored.

Writing a custom Processor

To add support for a new file format, subclass Processor and override the two abstract methods plus the supported_mime_types property. Register the new processor with Madam so that get_processor() finds it automatically.

Minimal example

import io
from madam.core import Asset, Processor, UnsupportedFormatError, operator

class NetpbmProcessor(Processor):
    """Read/write Netpbm PBM/PGM/PPM files."""

    MAGIC = {b'P1': 'image/x-portable-bitmap',
             b'P2': 'image/x-portable-graymap',
             b'P3': 'image/x-portable-pixmap',
             b'P4': 'image/x-portable-bitmap',
             b'P5': 'image/x-portable-graymap',
             b'P6': 'image/x-portable-pixmap'}

    @property
    def supported_mime_types(self) -> frozenset:
        return frozenset(self.MAGIC.values())

    def can_read(self, file: io.IOBase) -> bool:
        magic = file.read(2)
        file.seek(0)
        return magic in self.MAGIC

    def read(self, file: io.IOBase) -> Asset:
        data = file.read()
        magic = data[:2]
        if magic not in self.MAGIC:
            raise UnsupportedFormatError('Not a Netpbm file')
        mime_type = self.MAGIC[magic]
        # Parse dimensions from the header (simplified)
        lines = data.split(b'\n')
        w, h = map(int, lines[1].split())
        return Asset._from_bytes(data, mime_type=mime_type, width=w, height=h)

Adding operators with @operator

Use the operator() decorator to create lazy, reusable operators. Calling the method returns a configured callable (Asset Asset), not the result directly:

from madam.core import operator

class NetpbmProcessor(Processor):
    ...

    @operator
    def invert(self, asset: Asset) -> Asset:
        """Return a new asset with all pixel values inverted."""
        import PIL.Image
        img = PIL.Image.open(asset.essence).convert('RGB')
        inverted = PIL.Image.eval(img, lambda v: 255 - v)
        buf = io.BytesIO()
        inverted.save(buf, format='PPM')
        buf.seek(0)
        return Asset(essence=buf, mime_type=asset.mime_type,
                     width=asset.width, height=asset.height)

proc = NetpbmProcessor()
invert_op = proc.invert()          # returns a callable (not the result)
result = invert_op(ppm_asset)      # apply to an asset

Operators returned by @operator can be stored in a Pipeline and applied in sequence:

from madam.core import Pipeline

pipeline = Pipeline()
pipeline.add(proc.invert())
pipeline.add(proc.invert())   # double invert → original
(result,) = pipeline.process(ppm_asset)

Registering with Madam

from madam import Madam

madam = Madam()
madam.processors.append(NetpbmProcessor())
# madam.get_processor('image/x-portable-pixmap') now returns your processor

Note

Madam builds its internal MIME-type index at __init__ time. If you register a processor after construction, call madam._rebuild_mime_index() (or simply construct a fresh Madam() with the processor already appended to the processor list) to keep the O(1) lookup accurate.