madam.core module
- class madam.core.Asset(essence: IO, **metadata: Any)[source]
Bases:
objectRepresents a digital asset.
An Asset is an immutable value object whose contents consist of essence and metadata. Essence represents the actual data of a media file, such as the color values of an image, whereas the metadata describes the essence.
Assets should not be instantiated directly. Instead, use
read()to retrieve an Asset representing the content.- __init__(essence: IO, **metadata: Any) None[source]
Initializes a new Asset with the specified essence and metadata.
- Parameters:
essence (IO) – The essence of the asset as a file-like object
**metadata – The metadata describing the essence
- property content_id: str
Returns a stable, content-addressed identifier for this asset’s essence.
The identifier is the SHA-256 hex digest of the raw essence bytes and is independent of metadata. Two assets with identical bytes always have the same
content_id, making it suitable as an object-store key or deduplication handle.Added in version 0.23.
- class madam.core.AssetStorage[source]
Bases:
MutableMapping[AssetKey,tuple[Asset,frozenset[str]]],Generic[AssetKey]Represents an abstract base class for data stores of
Assetobjects.All implementations of AssetStorage require a constructor.
The persistence guarantees for stored data may differ based on the respective storage implementation.
- clear() None. Remove all items from D.
- filter(**kwargs: Any) Iterable[AssetKey][source]
Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.
- Parameters:
**kwargs – Criteria defined as keys and values
- Returns:
Sequence of asset keys
- Return type:
Iterable
- filter_by_tags(*tags: str) Iterable[AssetKey][source]
Returns a set of all asset keys in this storage that have at least the specified tags.
- Parameters:
*tags – Mandatory tags of an asset to be included in result
- Returns:
Keys of the assets whose tags are a superset of the specified tags
- Return type:
Iterable
- get(k[, d]) D[k] if k in D, else d. d defaults to None.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values
- class madam.core.FileSystemAssetStorage(path: Path | str)[source]
Bases:
AssetStorage[str]A persistent
AssetStoragethat writes each asset as two files on the filesystem:<key>/essence— raw essence bytes<key>/metadata.json— JSON-encoded metadata and tags
The storage is designed to work on any POSIX mount point, including network file systems (NFS, CIFS) and object-store-backed FUSE mounts (e.g. s3fs, rclone). Asset keys must be valid directory-name strings (no path separators).
Because files are written atomically (write to a temp file then rename), the storage is safe for concurrent writes from multiple Celery workers on a shared file system.
Added in version 0.23.
- __init__(path: Path | str) None[source]
Initialises a new
FileSystemAssetStoragerooted at path.The directory is created if it does not already exist.
- Parameters:
path – Root directory for stored assets.
- clear() None. Remove all items from D.
- filter(**kwargs: Any) Iterable[AssetKey]
Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.
- Parameters:
**kwargs – Criteria defined as keys and values
- Returns:
Sequence of asset keys
- Return type:
Iterable
- filter_by_tags(*tags: str) Iterable[AssetKey]
Returns a set of all asset keys in this storage that have at least the specified tags.
- Parameters:
*tags – Mandatory tags of an asset to be included in result
- Returns:
Keys of the assets whose tags are a superset of the specified tags
- Return type:
Iterable
- get(k[, d]) D[k] if k in D, else d. d defaults to None.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values
- class madam.core.InMemoryStorage[source]
Bases:
IndexedAssetStorage[Any]Represents a non-persistent storage backend for
Assetobjects.Assets are not serialized, but stored in memory.
- clear() None. Remove all items from D.
- filter(**kwargs: Any) Iterable[AssetKey]
Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.
- Parameters:
**kwargs – Criteria defined as keys and values
- Returns:
Sequence of asset keys
- Return type:
Iterable
- filter_by_tags(*tags: str) Iterable[AssetKey]
Returns a set of all asset keys in this storage that have at least the specified tags.
- Parameters:
*tags – Mandatory tags of an asset to be included in result
- Returns:
Keys of the assets whose tags are a superset of the specified tags
- Return type:
Iterable
- get(k[, d]) D[k] if k in D, else d. d defaults to None.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values
- class madam.core.IndexedAssetStorage[source]
Bases:
AssetStorage[AssetKey]Mixin that maintains an in-memory inverted index over scalar metadata values.
Makes
filter()O(k) where k is the number of matching assets instead of O(n·c) for n stored assets and c filter criteria.Subclasses must call
_index_asset()in__setitem__and_deindex_asset()in__delitem__.Added in version 0.23.
- clear() None. Remove all items from D.
- filter(**kwargs: Any) Iterable[AssetKey][source]
Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.
- Parameters:
**kwargs – Criteria defined as keys and values
- Returns:
Sequence of asset keys
- Return type:
Iterable
- filter_by_tags(*tags: str) Iterable[AssetKey]
Returns a set of all asset keys in this storage that have at least the specified tags.
- Parameters:
*tags – Mandatory tags of an asset to be included in result
- Returns:
Keys of the assets whose tags are a superset of the specified tags
- Return type:
Iterable
- get(k[, d]) D[k] if k in D, else d. d defaults to None.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values
- class madam.core.LazyAsset(uri: str, loader: Callable[[str], IO] | None, **metadata: Any)[source]
Bases:
AssetAn
Assetthat stores only a URI instead of raw bytes.Essence bytes are fetched on demand by calling the loader callable. Because the raw bytes are never stored in the object,
pickle.dumpsproduces a payload that contains only the URI and metadata — safe to send through a Celery broker even for large video files.- Parameters:
uri – Opaque string identifying the remote content (e.g. an S3 URI).
loader – Callable
(uri: str) -> IOthat returns a readable stream for the given URI. May beNoneto create a detached asset that will raise on essence access.**metadata – Metadata describing the asset.
Added in version 0.23.
- __init__(uri: str, loader: Callable[[str], IO] | None, **metadata: Any) None[source]
Initializes a new Asset with the specified essence and metadata.
- Parameters:
essence (IO) – The essence of the asset as a file-like object
**metadata – The metadata describing the essence
- property content_id: str
Returns a stable, content-addressed identifier for this asset’s essence.
The identifier is the SHA-256 hex digest of the raw essence bytes and is independent of metadata. Two assets with identical bytes always have the same
content_id, making it suitable as an object-store key or deduplication handle.Added in version 0.23.
- property essence: IO
Fetches and returns the asset content from the configured loader.
- Raises:
RuntimeError – if no loader was provided at construction time.
- class madam.core.Madam(config: Mapping[str, Any] | None = None)[source]
Bases:
objectRepresents an instance of the library.
- __init__(config: Mapping[str, Any] | None = None) None[source]
Initializes a new library instance with default configuration.
The default configuration includes a list of all available Processor and MetadataProcessor implementations.
- Parameters:
config – Mapping with settings.
- get_processor(source: Asset | IO | str) Processor[source]
Returns a processor that can handle the given source.
Three calling forms are supported:
get_processor(asset)— fast O(1) lookup byasset.mime_type; falls back to byte-probing the essence when the MIME type is not in the index.get_processor('image/jpeg')— fast O(1) lookup by MIME type string.get_processor(file)— slow byte-probe loop (same as before).
- Parameters:
source – An
Asset, a MIME type string, or a file-like object.- Raises:
UnsupportedFormatError – if no processor can handle the given source.
- Returns:
Processor that can handle the source.
- Return type:
- read(file: IO, additional_metadata: Mapping | None = None)[source]
Reads the specified file and returns its contents as an
Assetobject.- Parameters:
file (IO) – file-like object to be parsed
additional_metadata (Mapping) – optional metadata for the resulting asset. Existing metadata entries extracted from the file will be overwritten.
- Returns:
Asset representing the specified file
- Return type:
- Raises:
UnsupportedFormatError – if the file format cannot be recognized or is not supported
TypeError – if the file is None
- Example:
>>> import io >>> from madam import Madam >>> manager = Madam() >>> file = io.BytesIO(b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01' ... b'\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00' ... b'\x01\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82') >>> asset = manager.read(file)
- strip(asset: Asset) Asset[source]
Returns a copy of the asset with all embedded metadata removed from both the essence bytes and the metadata dict.
Structural properties such as
mime_type,width,height, anddurationare preserved. Format-specific metadata (exif,xmp,iptc,ffmetadata,rdf, …) and the derivedcreated_atkey are dropped.- Parameters:
asset (Asset) – Asset to strip
- Returns:
New asset without metadata
- Return type:
- Raises:
UnsupportedFormatError – if the asset format is not supported
Added in version 0.25.
- write(asset: Asset, file: IO) None[source]
Write the
Assetobject to the specified file.- Parameters:
asset (Asset) – Asset that contains the data to be written
file (IO) – file-like object to be written
- Example:
>>> import io >>> import os >>> from madam import Madam >>> from madam.core import Asset >>> gif_asset = Asset(essence=io.BytesIO(b'GIF89a\x01\x00\x01\x00\x00\x00\x00;'), mime_type='image/gif') >>> manager = Madam() >>> with open(os.devnull, 'wb') as file: ... manager.write(gif_asset, file) >>> wav_asset = Asset( ... essence=io.BytesIO(b'RIFF$\x00\x00\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x01\x00D\xac' ... b'\x00\x00\x88X\x01\x00\x02\x00\x10\x00data\x00\x00\x00\x00'), ... mime_type='video/mp4') >>> with open(os.devnull, 'wb') as file: ... manager.write(wav_asset, file)
- class madam.core.MetadataProcessor(config: Mapping[str, Any] | None = None)[source]
Bases:
objectRepresents an entity that can manipulate metadata.
Every MetadataProcessor needs to have an __init__ method with an optional config parameter in order to be registered correctly.
- abstractmethod __init__(config: Mapping[str, Any] | None = None) None[source]
Initializes a new MetadataProcessor.
- abstractmethod combine(file: IO, metadata: Mapping) IO[source]
Returns a byte stream whose contents represent the specified file where the specified metadata was added.
- Parameters:
metadata (Mapping) – Mapping of the metadata format to the metadata dict
file (IO) – Container file
- Returns:
file-like object with combined content
- Return type:
IO
- abstractmethod read(file: IO) Mapping[source]
Reads the file and returns the metadata.
The metadata that is returned is grouped by type. The keys are specified by
format.- Parameters:
file (IO) – File-like object to be read
- Returns:
Metadata contained in the file
- Return type:
Mapping
- Raises:
UnsupportedFormatError – if the data is corrupt or its format is not supported
- exception madam.core.OperatorError(*args)[source]
Bases:
ExceptionRepresents an error that is raised whenever an error occurs in an
operator().- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception madam.core.PermanentOperatorError(*args)[source]
Bases:
OperatorErrorRaised when an operator fails due to a permanent condition (e.g. invalid codec, corrupt input) that will never succeed on retry.
Added in version 0.23.
- __init__(*args)
Initializes a new OperatorError.
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class madam.core.Pipeline[source]
Bases:
objectRepresents a processing pipeline for
Assetobjects.The pipeline can be configured to hold a list of asset processing operators, all of which are applied to one or more assets when calling the
process()method.In addition to linear chains of operators, the pipeline supports fan-out via
branch()and conditional dispatch viawhen().- add(operator: Callable) None[source]
Appends the specified operator to the processing chain.
- Parameters:
operator – Operator to be added
- branch(*pipelines: Pipeline) None[source]
Adds a fan-out step that sends each incoming asset through every sub-pipeline, yielding one output asset per sub-pipeline per input.
- Parameters:
*pipelines – Sub-pipelines to fan out into
Added in version 0.24.
- static flush() _FlushStep[source]
Return a flush sentinel that forces materialisation at this point.
Inserting a flush step between two operators that belong to the same processor causes the pipeline to end the current deferred run and begin a fresh one. Use this when an intermediate encode/decode cycle is required (e.g. to stabilise file size or byte layout).
- Returns:
A
_FlushStepsentinel callable
Added in version 1.0.
- process(*assets: Asset) Generator[Asset, float, None][source]
Applies the operators in this pipeline on the specified assets.
Consecutive operators that share the same
Processorare grouped into a run and dispatched viaProcessor.execute_run()so that each processor can defer encoding until the run boundary. Untagged callables (plain functions, lambdas) and control-flow steps (_BranchStep,_WhenStep) are treated as materialisation points.- Parameters:
*assets (Asset) – Asset objects to be processed
- Returns:
Generator with processed assets
- when(predicate: Callable[[Asset], bool], then: Callable[[Asset], Asset], else_: Callable[[Asset], Asset] | None = None) None[source]
Adds a conditional step that applies then when predicate returns
Trueand else_ (if given) otherwise. When predicate returnsFalseand no else_ is provided, the asset passes through unchanged.- Parameters:
predicate – Callable that receives an asset and returns a bool
then – Operator applied when predicate is
Trueelse – Operator applied when predicate is
False; optional
Added in version 0.24.
- class madam.core.ProcessingContext[source]
Bases:
ABCRepresents the deferred in-memory state of an asset being processed.
Consecutive operators that share the same
Processorare grouped into a run byPipeline. The processor accumulates each operator’s effect on the context object and only encodes the result once whenmaterialize()is called — either at a processor boundary or at the end of the pipeline.Subclass this to implement deferred execution for a custom
Processor. Overrideexecute_run()on the processor to build and return an instance of your subclass.Added in version 1.0.
- class madam.core.Processor(config: Mapping[str, Any] | None = None)[source]
Bases:
objectRepresents an entity that can create
Assetobjects from binary data.Every Processor needs to have an __init__ method with an optional config parameter in order to be registered correctly.
- abstractmethod __init__(config: Mapping[str, Any] | None = None) None[source]
Initializes a new Processor.
- Parameters:
config – Mapping with settings.
- abstractmethod can_read(file: IO) bool[source]
Returns whether the specified MIME type is supported by this processor.
- Parameters:
file (IO) – file-like object to be tested
- Returns:
whether the data format of the specified file is supported or not
- Return type:
- execute_run(steps: list[Callable], asset_or_context: Asset | ProcessingContext) Asset | ProcessingContext[source]
Execute a grouped run of consecutive operators from this processor.
The default implementation applies each step sequentially, equivalent to the old per-step behaviour. Subclasses may override this to defer encoding: accumulate each operator’s effect into a
ProcessingContextand return it;Pipelinewill callProcessingContext.materialize()at the next processor boundary or at the end of the pipeline.- Parameters:
steps – Ordered list of tagged operator callables in this run.
asset_or_context – Input asset (or live context from a preceding run of the same processor).
- Returns:
Processed
Assetor a liveProcessingContext.
Added in version 1.0.
- abstractmethod read(file: IO) Asset[source]
Returns an
Assetobject whose essence is identical to the contents of the specified file.- Parameters:
file (IO) – file-like object to be read
- Returns:
Asset with essence
- Return type:
- Raises:
UnsupportedFormatError – if the specified data format is not supported
- class madam.core.ShelveStorage(path: Path | str)[source]
Bases:
AssetStorage[str]Represents a persistent storage backend for
Assetobjects. Asset keys must be strings.ShelveStorage uses a file on the file system to serialize Assets.
- __init__(path: Path | str)[source]
Initializes a new ShelveStorage with the specified path.
- Parameters:
path (pathlib.Path or str) – File system path where the data should be stored
- clear() None. Remove all items from D.
- filter(**kwargs: Any) Iterable[AssetKey]
Returns a sequence of asset keys whose assets match the criteria that are specified by the passed arguments.
- Parameters:
**kwargs – Criteria defined as keys and values
- Returns:
Sequence of asset keys
- Return type:
Iterable
- filter_by_tags(*tags: str) Iterable[AssetKey]
Returns a set of all asset keys in this storage that have at least the specified tags.
- Parameters:
*tags – Mandatory tags of an asset to be included in result
- Returns:
Keys of the assets whose tags are a superset of the specified tags
- Return type:
Iterable
- get(k[, d]) D[k] if k in D, else d. d defaults to None.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
- popitem() (k, v), remove and return some (key, value) pair
as a 2-tuple; but raise KeyError if D is empty.
- setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
- update([E, ]**F) None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v
- values() an object providing a view on D's values
- exception madam.core.TransientOperatorError(*args)[source]
Bases:
OperatorErrorRaised when an operator fails due to a temporary condition (e.g. OOM, disk full) that may succeed on retry.
Added in version 0.23.
- __init__(*args)
Initializes a new OperatorError.
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception madam.core.UnsupportedFormatError(*args)[source]
Bases:
PermanentOperatorErrorRepresents an error that is raised whenever file content with unknown type is encountered.
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- madam.core.operator(function: Callable[[Concatenate[Any, Asset, _P]], Asset]) Callable[[Concatenate[Any, _P]], Callable[[Asset], Asset]][source]
Decorator function for methods that process assets.
Usually, it will be used with operations in a
Processorimplementation to make the methods configurable before applying the method to an asset.Only keyword arguments are allowed for configuration.
Example for using a decorated
convertmethod:convert_to_opus = processor.convert(mime_type='audio/opus') convert_to_opus(asset)
- Parameters:
function – Method to decorate
- Returns:
Configurable method