madam.core module

class madam.core.Asset(essence: IO, **metadata: Any)[source]

Bases: object

Represents a digital asset.

An Asset is an immutable value object whose contents consist of essence and metadata. Essence represents the actual data of a media file, such as the color values of an image, whereas the metadata describes the essence.

Assets should not be instantiated directly. Instead, use read() to retrieve an Asset representing the content.

__init__(essence: IO, **metadata: Any) None[source]

Initializes a new Asset with the specified essence and metadata.

Parameters:
  • essence (IO) – The essence of the asset as a file-like object

  • **metadata – The metadata describing the essence

property content_id: str

Returns a stable, content-addressed identifier for this asset’s essence.

The identifier is the SHA-256 hex digest of the raw essence bytes and is independent of metadata. Two assets with identical bytes always have the same content_id, making it suitable as an object-store key or deduplication handle.

Added in version 0.23.

property essence: IO

Represents the actual content of the asset.

The essence of an MP3 file, for example, is only comprised of the actual audio data, whereas metadata such as ID3 tags are stored separately as metadata.

class madam.core.AssetStorage[source]

Bases: MutableMapping[AssetKey, tuple[Asset, frozenset[str]]], Generic[AssetKey]

Represents an abstract base class for data stores of Asset objects.

All implementations of AssetStorage require a constructor.

The persistence guarantees for stored data may differ based on the respective storage implementation.

abstractmethod __init__() None[source]

Initializes a new AssetStorage.

clear() None.  Remove all items from D.
filter(**kwargs: Any) Iterable[AssetKey][source]

Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.

Parameters:

**kwargs – Criteria defined as keys and values

Returns:

Sequence of asset keys

Return type:

Iterable

filter_by_tags(*tags: str) Iterable[AssetKey][source]

Returns a set of all asset keys in this storage that have at least the specified tags.

Parameters:

*tags – Mandatory tags of an asset to be included in result

Returns:

Keys of the assets whose tags are a superset of the specified tags

Return type:

Iterable

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values
class madam.core.FileSystemAssetStorage(path: Path | str)[source]

Bases: AssetStorage[str]

A persistent AssetStorage that writes each asset as two files on the filesystem:

  • <key>/essence — raw essence bytes

  • <key>/metadata.json — JSON-encoded metadata and tags

The storage is designed to work on any POSIX mount point, including network file systems (NFS, CIFS) and object-store-backed FUSE mounts (e.g. s3fs, rclone). Asset keys must be valid directory-name strings (no path separators).

Because files are written atomically (write to a temp file then rename), the storage is safe for concurrent writes from multiple Celery workers on a shared file system.

Added in version 0.23.

__init__(path: Path | str) None[source]

Initialises a new FileSystemAssetStorage rooted at path.

The directory is created if it does not already exist.

Parameters:

path – Root directory for stored assets.

clear() None.  Remove all items from D.
filter(**kwargs: Any) Iterable[AssetKey]

Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.

Parameters:

**kwargs – Criteria defined as keys and values

Returns:

Sequence of asset keys

Return type:

Iterable

filter_by_tags(*tags: str) Iterable[AssetKey]

Returns a set of all asset keys in this storage that have at least the specified tags.

Parameters:

*tags – Mandatory tags of an asset to be included in result

Returns:

Keys of the assets whose tags are a superset of the specified tags

Return type:

Iterable

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values
class madam.core.InMemoryStorage[source]

Bases: IndexedAssetStorage[Any]

Represents a non-persistent storage backend for Asset objects.

Assets are not serialized, but stored in memory.

__init__() None[source]

Initializes a new, empty InMemoryStorage object.

clear() None.  Remove all items from D.
filter(**kwargs: Any) Iterable[AssetKey]

Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.

Parameters:

**kwargs – Criteria defined as keys and values

Returns:

Sequence of asset keys

Return type:

Iterable

filter_by_tags(*tags: str) Iterable[AssetKey]

Returns a set of all asset keys in this storage that have at least the specified tags.

Parameters:

*tags – Mandatory tags of an asset to be included in result

Returns:

Keys of the assets whose tags are a superset of the specified tags

Return type:

Iterable

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values
class madam.core.IndexedAssetStorage[source]

Bases: AssetStorage[AssetKey]

Mixin that maintains an in-memory inverted index over scalar metadata values.

Makes filter() O(k) where k is the number of matching assets instead of O(n·c) for n stored assets and c filter criteria.

Subclasses must call _index_asset() in __setitem__ and _deindex_asset() in __delitem__.

Added in version 0.23.

__init__() None[source]

Initializes a new AssetStorage.

clear() None.  Remove all items from D.
filter(**kwargs: Any) Iterable[AssetKey][source]

Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.

Parameters:

**kwargs – Criteria defined as keys and values

Returns:

Sequence of asset keys

Return type:

Iterable

filter_by_tags(*tags: str) Iterable[AssetKey]

Returns a set of all asset keys in this storage that have at least the specified tags.

Parameters:

*tags – Mandatory tags of an asset to be included in result

Returns:

Keys of the assets whose tags are a superset of the specified tags

Return type:

Iterable

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values
class madam.core.LazyAsset(uri: str, loader: Callable[[str], IO] | None, **metadata: Any)[source]

Bases: Asset

An Asset that stores only a URI instead of raw bytes.

Essence bytes are fetched on demand by calling the loader callable. Because the raw bytes are never stored in the object, pickle.dumps produces a payload that contains only the URI and metadata — safe to send through a Celery broker even for large video files.

Parameters:
  • uri – Opaque string identifying the remote content (e.g. an S3 URI).

  • loader – Callable (uri: str) -> IO that returns a readable stream for the given URI. May be None to create a detached asset that will raise on essence access.

  • **metadata – Metadata describing the asset.

Added in version 0.23.

__init__(uri: str, loader: Callable[[str], IO] | None, **metadata: Any) None[source]

Initializes a new Asset with the specified essence and metadata.

Parameters:
  • essence (IO) – The essence of the asset as a file-like object

  • **metadata – The metadata describing the essence

property content_id: str

Returns a stable, content-addressed identifier for this asset’s essence.

The identifier is the SHA-256 hex digest of the raw essence bytes and is independent of metadata. Two assets with identical bytes always have the same content_id, making it suitable as an object-store key or deduplication handle.

Added in version 0.23.

property essence: IO

Fetches and returns the asset content from the configured loader.

Raises:

RuntimeError – if no loader was provided at construction time.

property uri: str

The URI that identifies the remote content.

class madam.core.Madam(config: Mapping[str, Any] | None = None)[source]

Bases: object

Represents an instance of the library.

__init__(config: Mapping[str, Any] | None = None) None[source]

Initializes a new library instance with default configuration.

The default configuration includes a list of all available Processor and MetadataProcessor implementations.

Parameters:

config – Mapping with settings.

get_processor(source: Asset | IO | str) Processor[source]

Returns a processor that can handle the given source.

Three calling forms are supported:

  • get_processor(asset) — fast O(1) lookup by asset.mime_type; falls back to byte-probing the essence when the MIME type is not in the index.

  • get_processor('image/jpeg') — fast O(1) lookup by MIME type string.

  • get_processor(file) — slow byte-probe loop (same as before).

Parameters:

source – An Asset, a MIME type string, or a file-like object.

Raises:

UnsupportedFormatError – if no processor can handle the given source.

Returns:

Processor that can handle the source.

Return type:

Processor

read(file: IO, additional_metadata: Mapping | None = None)[source]

Reads the specified file and returns its contents as an Asset object.

Parameters:
  • file (IO) – file-like object to be parsed

  • additional_metadata (Mapping) – optional metadata for the resulting asset. Existing metadata entries extracted from the file will be overwritten.

Returns:

Asset representing the specified file

Return type:

Asset

Raises:
Example:

>>> import io
>>> from madam import Madam
>>> manager = Madam()
>>> file = io.BytesIO(b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01'
... b'\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00'
... b'\x01\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82')
>>> asset = manager.read(file)
strip(asset: Asset) Asset[source]

Returns a copy of the asset with all embedded metadata removed from both the essence bytes and the metadata dict.

Structural properties such as mime_type, width, height, and duration are preserved. Format-specific metadata (exif, xmp, iptc, ffmetadata, rdf, …) and the derived created_at key are dropped.

Parameters:

asset (Asset) – Asset to strip

Returns:

New asset without metadata

Return type:

Asset

Raises:

UnsupportedFormatError – if the asset format is not supported

Added in version 0.25.

write(asset: Asset, file: IO) None[source]

Write the Asset object to the specified file.

Parameters:
  • asset (Asset) – Asset that contains the data to be written

  • file (IO) – file-like object to be written

Example:

>>> import io
>>> import os
>>> from madam import Madam
>>> from madam.core import Asset
>>> gif_asset = Asset(essence=io.BytesIO(b'GIF89a\x01\x00\x01\x00\x00\x00\x00;'), mime_type='image/gif')
>>> manager = Madam()
>>> with open(os.devnull, 'wb') as file:
...     manager.write(gif_asset, file)
>>> wav_asset = Asset(
...     essence=io.BytesIO(b'RIFF$\x00\x00\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00D\xac'
...             b'\x00\x00\x88X\x01\x00\x02\x00\x10\x00data\x00\x00\x00\x00'),
...     mime_type='video/mp4')
>>> with open(os.devnull, 'wb') as file:
...     manager.write(wav_asset, file)
class madam.core.MetadataProcessor(config: Mapping[str, Any] | None = None)[source]

Bases: object

Represents an entity that can manipulate metadata.

Every MetadataProcessor needs to have an __init__ method with an optional config parameter in order to be registered correctly.

abstractmethod __init__(config: Mapping[str, Any] | None = None) None[source]

Initializes a new MetadataProcessor.

abstractmethod combine(file: IO, metadata: Mapping) IO[source]

Returns a byte stream whose contents represent the specified file where the specified metadata was added.

Parameters:
  • metadata (Mapping) – Mapping of the metadata format to the metadata dict

  • file (IO) – Container file

Returns:

file-like object with combined content

Return type:

IO

abstract property formats: Iterable[str]

The metadata formats which are supported.

Returns:

supported metadata formats

Return type:

set[str]

abstractmethod read(file: IO) Mapping[source]

Reads the file and returns the metadata.

The metadata that is returned is grouped by type. The keys are specified by format.

Parameters:

file (IO) – File-like object to be read

Returns:

Metadata contained in the file

Return type:

Mapping

Raises:

UnsupportedFormatError – if the data is corrupt or its format is not supported

abstractmethod strip(file: IO) IO[source]

Removes all metadata of the supported type from the specified file.

Parameters:

file (IO) – file-like that should get stripped of the metadata

Returns:

file-like object without metadata

Return type:

IO

exception madam.core.OperatorError(*args)[source]

Bases: Exception

Represents an error that is raised whenever an error occurs in an operator().

__init__(*args)[source]

Initializes a new OperatorError.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception madam.core.PermanentOperatorError(*args)[source]

Bases: OperatorError

Raised when an operator fails due to a permanent condition (e.g. invalid codec, corrupt input) that will never succeed on retry.

Added in version 0.23.

__init__(*args)

Initializes a new OperatorError.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class madam.core.Pipeline[source]

Bases: object

Represents a processing pipeline for Asset objects.

The pipeline can be configured to hold a list of asset processing operators, all of which are applied to one or more assets when calling the process() method.

In addition to linear chains of operators, the pipeline supports fan-out via branch() and conditional dispatch via when().

__init__() None[source]

Initializes a new pipeline without operators.

add(operator: Callable) None[source]

Appends the specified operator to the processing chain.

Parameters:

operator – Operator to be added

branch(*pipelines: Pipeline) None[source]

Adds a fan-out step that sends each incoming asset through every sub-pipeline, yielding one output asset per sub-pipeline per input.

Parameters:

*pipelines – Sub-pipelines to fan out into

Added in version 0.24.

static flush() _FlushStep[source]

Return a flush sentinel that forces materialisation at this point.

Inserting a flush step between two operators that belong to the same processor causes the pipeline to end the current deferred run and begin a fresh one. Use this when an intermediate encode/decode cycle is required (e.g. to stabilise file size or byte layout).

Returns:

A _FlushStep sentinel callable

Added in version 1.0.

process(*assets: Asset) Generator[Asset, float, None][source]

Applies the operators in this pipeline on the specified assets.

Consecutive operators that share the same Processor are grouped into a run and dispatched via Processor.execute_run() so that each processor can defer encoding until the run boundary. Untagged callables (plain functions, lambdas) and control-flow steps (_BranchStep, _WhenStep) are treated as materialisation points.

Parameters:

*assets (Asset) – Asset objects to be processed

Returns:

Generator with processed assets

when(predicate: Callable[[Asset], bool], then: Callable[[Asset], Asset], else_: Callable[[Asset], Asset] | None = None) None[source]

Adds a conditional step that applies then when predicate returns True and else_ (if given) otherwise. When predicate returns False and no else_ is provided, the asset passes through unchanged.

Parameters:
  • predicate – Callable that receives an asset and returns a bool

  • then – Operator applied when predicate is True

  • else – Operator applied when predicate is False; optional

Added in version 0.24.

class madam.core.ProcessingContext[source]

Bases: ABC

Represents the deferred in-memory state of an asset being processed.

Consecutive operators that share the same Processor are grouped into a run by Pipeline. The processor accumulates each operator’s effect on the context object and only encodes the result once when materialize() is called — either at a processor boundary or at the end of the pipeline.

Subclass this to implement deferred execution for a custom Processor. Override execute_run() on the processor to build and return an instance of your subclass.

Added in version 1.0.

abstractmethod materialize() Asset[source]

Encode and return the final Asset.

abstract property processor: Processor

The Processor that owns this context.

class madam.core.Processor(config: Mapping[str, Any] | None = None)[source]

Bases: object

Represents an entity that can create Asset objects from binary data.

Every Processor needs to have an __init__ method with an optional config parameter in order to be registered correctly.

abstractmethod __init__(config: Mapping[str, Any] | None = None) None[source]

Initializes a new Processor.

Parameters:

config – Mapping with settings.

abstractmethod can_read(file: IO) bool[source]

Returns whether the specified MIME type is supported by this processor.

Parameters:

file (IO) – file-like object to be tested

Returns:

whether the data format of the specified file is supported or not

Return type:

bool

execute_run(steps: list[Callable], asset_or_context: Asset | ProcessingContext) Asset | ProcessingContext[source]

Execute a grouped run of consecutive operators from this processor.

The default implementation applies each step sequentially, equivalent to the old per-step behaviour. Subclasses may override this to defer encoding: accumulate each operator’s effect into a ProcessingContext and return it; Pipeline will call ProcessingContext.materialize() at the next processor boundary or at the end of the pipeline.

Parameters:
  • steps – Ordered list of tagged operator callables in this run.

  • asset_or_context – Input asset (or live context from a preceding run of the same processor).

Returns:

Processed Asset or a live ProcessingContext.

Added in version 1.0.

abstractmethod read(file: IO) Asset[source]

Returns an Asset object whose essence is identical to the contents of the specified file.

Parameters:

file (IO) – file-like object to be read

Returns:

Asset with essence

Return type:

Asset

Raises:

UnsupportedFormatError – if the specified data format is not supported

property supported_mime_types: frozenset

MIME types this processor can handle (used to build the Madam index).

Added in version 0.24.

class madam.core.ShelveStorage(path: Path | str)[source]

Bases: AssetStorage[str]

Represents a persistent storage backend for Asset objects. Asset keys must be strings.

ShelveStorage uses a file on the file system to serialize Assets.

__init__(path: Path | str)[source]

Initializes a new ShelveStorage with the specified path.

Parameters:

path (pathlib.Path or str) – File system path where the data should be stored

clear() None.  Remove all items from D.
filter(**kwargs: Any) Iterable[AssetKey]

Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.

Parameters:

**kwargs – Criteria defined as keys and values

Returns:

Sequence of asset keys

Return type:

Iterable

filter_by_tags(*tags: str) Iterable[AssetKey]

Returns a set of all asset keys in this storage that have at least the specified tags.

Parameters:

*tags – Mandatory tags of an asset to be included in result

Returns:

Keys of the assets whose tags are a superset of the specified tags

Return type:

Iterable

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values
exception madam.core.TransientOperatorError(*args)[source]

Bases: OperatorError

Raised when an operator fails due to a temporary condition (e.g. OOM, disk full) that may succeed on retry.

Added in version 0.23.

__init__(*args)

Initializes a new OperatorError.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception madam.core.UnsupportedFormatError(*args)[source]

Bases: PermanentOperatorError

Represents an error that is raised whenever file content with unknown type is encountered.

__init__(*args) None[source]

Initializes a new UnsupportedFormatError.

add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

madam.core.operator(function: Callable[[Concatenate[Any, Asset, _P]], Asset]) Callable[[Concatenate[Any, _P]], Callable[[Asset], Asset]][source]

Decorator function for methods that process assets.

Usually, it will be used with operations in a Processor implementation to make the methods configurable before applying the method to an asset.

Only keyword arguments are allowed for configuration.

Example for using a decorated convert method:

convert_to_opus = processor.convert(mime_type='audio/opus')
convert_to_opus(asset)
Parameters:

function – Method to decorate

Returns:

Configurable method