API reference: crem.db

Programmatic database building — the Python equivalents of cremdb_create, cremdb_merge, and cremdb_add_prop. See Build a database (v1) and Fragment properties for guides.

db

Python API for CReM fragment database management.

All three database operations (creation, merging, property annotation) are available as plain Python functions importable from this module:

from crem.db import create_db, merge_dbs, add_fragment_props

create_db

create_db(input: Union[PathLike, Iterable[str]], output: PathLike, set_name: Union[str, Dict[str, Optional[set]]], radii=(1, 2, 3, 4, 5), *, ncpu: int = 1, max_heavy_atoms: int = 15, keep_stereo: bool = False, mode: int = 0, chunk_size: int = 100, flush_every: int = 100, shard_size: Optional[int] = None, parallel_shards: int = 1, frag_mode: str = 'both_optimal', verbose: bool = True, sep: Optional[str] = None, processed_chunks: Optional[PathLike] = None, force_zstd: bool = False, log_every: Optional[int] = None, prefetch: int = 4, timings: bool = False, merge_parallel: Optional[int] = None, fragment_error_log: bool = False) -> None

Create or extend a v1 CReM fragment database.

Calling on an existing database is safe and additive: _ensure_schema uses CREATE TABLE IF NOT EXISTS and incremental ALTER TABLE, so any new set names or radii are added and existing data is preserved.

Parameters:
  • input (Union[PathLike, Iterable[str]]) –

    path to a SMILES file (str / Path) or an iterable of "SMILES [ID]" strings (one molecule per item).

  • output (PathLike) –

    path to the output SQLite database.

  • set_name (Union[str, Dict[str, Optional[set]]]) –

    a single set name (str), or a dict mapping each set name to either None (all molecules) or a set of molecule IDs that belong to that set.

  • radii

    fragment radii to build (default 1–5).

  • ncpu (int, default: 1 ) –

    worker processes.

  • max_heavy_atoms (int, default: 15 ) –

    maximum heavy atoms in a core fragment.

  • keep_stereo (bool, default: False ) –

    preserve stereocentres in env/core SMILES.

  • mode (int, default: 0 ) –

    fragmentation mode — 0 all atoms, 1 heavy only, 2 H only.

  • chunk_size (int, default: 100 ) –

    input lines per worker task.

  • flush_every (int, default: 100 ) –

    chunks to accumulate before each DB flush.

  • shard_size (Optional[int], default: None ) –

    max input structures per shard DB (None = single DB). Incompatible with parallel_shards > 1.

  • parallel_shards (int, default: 1 ) –

    when > 1, run N shard builders concurrently, each fragmenting a stride of the input. CPUs from ncpu are split evenly across them. Shard DBs live in <output>.parts/ and are merged into output via a parallel binary-tree reduction. Default 1 (single-process build).

  • frag_mode (str, default: 'both_optimal' ) –

    fragmentation source: 'acyclic', 'ring', 'both', 'ring_optimal', or 'both_optimal'. Default 'both_optimal' matches cremdb_create.

  • verbose (bool, default: True ) –

    print progress and statistics to stdout/stderr.

  • sep (Optional[str], default: None ) –

    input delimiter (None = whitespace).

  • processed_chunks (Optional[PathLike], default: None ) –

    path to a processed-chunks file for resumable non-parallel builds from file input. Ignored when input is an iterable. Also ignored for parallel_shards > 1; parallel builds manage per-shard processed-chunk files internally.

  • force_zstd (bool, default: False ) –

    force zstd input decompression regardless of file suffix.

  • log_every (Optional[int], default: None ) –

    print a progress line every N chunks (None = silent).

  • prefetch (int, default: 4 ) –

    in-flight task batches per worker.

  • timings (bool, default: False ) –

    print per-flush timing breakdown to stderr.

  • merge_parallel (Optional[int], default: None ) –

    max concurrent pair-merges for parallel_shards > 1.

  • fragment_error_log (bool, default: False ) –

    write defensive fragment validation issues to <output>.errors. If false, issues are written to stderr.

Source code in crem/db.py
def create_db(
    input: Union[PathLike, Iterable[str]],
    output: PathLike,
    set_name: Union[str, Dict[str, Optional[set]]],
    radii=(1, 2, 3, 4, 5),
    *,
    ncpu: int = 1,
    max_heavy_atoms: int = 15,
    keep_stereo: bool = False,
    mode: int = 0,
    chunk_size: int = 100,
    flush_every: int = 100,
    shard_size: Optional[int] = None,
    parallel_shards: int = 1,
    frag_mode: str = 'both_optimal',
    verbose: bool = True,
    sep: Optional[str] = None,
    processed_chunks: Optional[PathLike] = None,
    force_zstd: bool = False,
    log_every: Optional[int] = None,
    prefetch: int = 4,
    timings: bool = False,
    merge_parallel: Optional[int] = None,
    fragment_error_log: bool = False,
) -> None:
    """Create or extend a v1 CReM fragment database.

    Calling on an existing database is safe and additive: ``_ensure_schema``
    uses ``CREATE TABLE IF NOT EXISTS`` and incremental ``ALTER TABLE``, so
    any new set names or radii are added and existing data is preserved.

    :param input: path to a SMILES file (``str`` / ``Path``) **or** an iterable
        of ``"SMILES [ID]"`` strings (one molecule per item).
    :param output: path to the output SQLite database.
    :param set_name: a single set name (``str``), or a ``dict`` mapping each set
        name to either ``None`` (all molecules) or a ``set`` of molecule IDs
        that belong to that set.
    :param radii: fragment radii to build (default 1–5).
    :param ncpu: worker processes.
    :param max_heavy_atoms: maximum heavy atoms in a core fragment.
    :param keep_stereo: preserve stereocentres in env/core SMILES.
    :param mode: fragmentation mode — 0 all atoms, 1 heavy only, 2 H only.
    :param chunk_size: input lines per worker task.
    :param flush_every: chunks to accumulate before each DB flush.
    :param shard_size: max input structures per shard DB (``None`` = single DB).
        Incompatible with ``parallel_shards > 1``.
    :param parallel_shards: when > 1, run N shard builders concurrently, each
        fragmenting a stride of the input. CPUs from ``ncpu`` are split evenly
        across them. Shard DBs live in ``<output>.parts/`` and are merged into
        ``output`` via a parallel binary-tree reduction. Default 1
        (single-process build).
    :param frag_mode: fragmentation source: ``'acyclic'``, ``'ring'``,
        ``'both'``, ``'ring_optimal'``, or ``'both_optimal'``. Default
        ``'both_optimal'`` matches ``cremdb_create``.
    :param verbose: print progress and statistics to stdout/stderr.
    :param sep: input delimiter (``None`` = whitespace).
    :param processed_chunks: path to a processed-chunks file for resumable
        non-parallel builds from file input. Ignored when ``input`` is an
        iterable. Also ignored for ``parallel_shards > 1``; parallel builds
        manage per-shard processed-chunk files internally.
    :param force_zstd: force zstd input decompression regardless of file suffix.
    :param log_every: print a progress line every N chunks (``None`` = silent).
    :param prefetch: in-flight task batches per worker.
    :param timings: print per-flush timing breakdown to stderr.
    :param merge_parallel: max concurrent pair-merges for ``parallel_shards > 1``.
    :param fragment_error_log: write defensive fragment validation issues to
        ``<output>.errors``. If false, issues are written to stderr.
    """
    if parallel_shards < 1:
        raise ValueError("parallel_shards must be >= 1")
    if parallel_shards > 1 and shard_size is not None:
        raise ValueError("parallel_shards > 1 is incompatible with shard_size")
    from crem.scripts.cremdb_create import run as _run, run_parallel_shards as _run_parallel

    tmp_input: Optional[str] = None
    tmp_ids: List[str] = []

    try:
        # --- resolve input ---------------------------------------------------
        if isinstance(input, (str, Path)):
            input_path = str(input)
            processed_chunks_arg = (
                str(processed_chunks) if processed_chunks is not None else None
            )
        else:
            processed_chunks_arg = None
            with tempfile.NamedTemporaryFile(
                mode='w', suffix='.smi', delete=False, encoding='utf-8'
            ) as fh:
                tmp_input = fh.name
                for line in input:
                    fh.write(line.rstrip('\n') + '\n')
            input_path = tmp_input

        # --- resolve set_name ------------------------------------------------
        if isinstance(set_name, str):
            set_name_arg = [set_name]
        elif isinstance(set_name, dict):
            set_name_arg = []
            for name, ids in set_name.items():
                set_name_arg.append(name)
                if ids is not None:
                    with tempfile.NamedTemporaryFile(
                        mode='w', suffix='.txt', delete=False, encoding='utf-8'
                    ) as fh:
                        tmp_ids.append(fh.name)
                        for mol_id in ids:
                            fh.write(str(mol_id) + '\n')
                    set_name_arg.append(tmp_ids[-1])
        else:
            raise TypeError("set_name must be a str or dict")

        if parallel_shards > 1:
            _run_parallel(
                input_path=input_path,
                output_db=str(output),
                set_name=set_name_arg,
                parallel_shards=parallel_shards,
                ncpu=ncpu,
                radii=list(radii),
                chunk_size=chunk_size,
                max_heavy_atoms=max_heavy_atoms,
                keep_stereo=keep_stereo,
                mode=mode,
                flush_every=flush_every,
                verbose=verbose,
                frag_mode=frag_mode,
                sep=sep,
                force_zstd=force_zstd,
                log_every=log_every,
                prefetch=prefetch,
                timings=timings,
                merge_parallel=merge_parallel,
                fragment_error_log=fragment_error_log,
            )
        else:
            _run(
                input_path=input_path,
                output_db=str(output),
                set_name=set_name_arg,
                radii=list(radii),
                chunk_size=chunk_size,
                max_heavy_atoms=max_heavy_atoms,
                keep_stereo=keep_stereo,
                mode=mode,
                flush_every=flush_every,
                shard_size=shard_size,
                ncpu=ncpu,
                verbose=verbose,
                frag_mode=frag_mode,
                sep=sep,
                processed_chunks=processed_chunks_arg,
                force_zstd=force_zstd,
                log_every=log_every,
                prefetch=prefetch,
                timings=timings,
                fragment_error_log=fragment_error_log,
            )

    finally:
        if tmp_input and os.path.exists(tmp_input):
            os.unlink(tmp_input)
        for p in tmp_ids:
            if os.path.exists(p):
                os.unlink(p)

merge_dbs

merge_dbs(target: PathLike, sources: List[PathLike], *, rebuild_index: bool = True, parallel: int = 1, verbose: bool = True) -> None

Merge source shard databases into target.

Parameters:
  • target (PathLike) –

    path to the target (base) database. Must already exist.

  • sources (List[PathLike]) –

    list of source shard database paths to merge in.

  • rebuild_index (bool, default: True ) –

    recreate covering indices on the target after merge.

  • parallel (int, default: 1 ) –

    when > 1, merge with binary-tree reduction using up to this many concurrent pair-merges per round. The target is treated as one of the contributors; the final survivor is moved back to target. Default 1 (serial).

  • verbose (bool, default: True ) –

    print per-shard progress.

Source code in crem/db.py
def merge_dbs(
    target: PathLike,
    sources: List[PathLike],
    *,
    rebuild_index: bool = True,
    parallel: int = 1,
    verbose: bool = True,
) -> None:
    """Merge source shard databases into ``target``.

    :param target: path to the target (base) database. Must already exist.
    :param sources: list of source shard database paths to merge in.
    :param rebuild_index: recreate covering indices on the target after merge.
    :param parallel: when > 1, merge with binary-tree reduction using up to this
        many concurrent pair-merges per round. The target is treated as one of
        the contributors; the final survivor is moved back to ``target``.
        Default 1 (serial).
    :param verbose: print per-shard progress.
    """
    if parallel < 1:
        raise ValueError("parallel must be >= 1")
    from crem.scripts.cremdb_merge import run as _run
    _run(
        target_path=str(target),
        source_paths=[str(s) for s in sources],
        rebuild_index=rebuild_index,
        verbose=verbose,
        parallel=parallel,
    )

add_fragment_props

add_fragment_props(db: PathLike, properties=_PROPS_DEFAULT, *, custom_props: Optional[Dict[str, Callable[[str], float]]] = None, table: str = 'frags', ncpu: int = 1, verbose: bool = False) -> None

Add molecular properties to a CReM fragment database.

Only rows with NULL property values are processed, so calling this function after adding new fragments fills only the newly added rows.

Built-in properties are computed on the frags table (core_smi column) using RDKit descriptors. Custom properties can target either 'frags' (core_smi) or 'frags_h' (H-replaced SMILES smi).

Parameters:
  • db (PathLike) –

    path to the fragment database.

  • properties

    built-in property names to compute ('mw', 'logp', 'rtb', 'tpsa', 'fcsp3'). Accepted values: if omitted, all built-ins are computed when custom_props is not given and no built-ins when custom_props is given (so add_fragment_props(db, custom_props={...}) adds only the custom columns, while the usual add_fragment_props(db) is unchanged); None or 'all' forces all built-ins (combine with custom_props to add both at once); a list/tuple computes that subset; and [] skips built-ins entirely.

  • custom_props (Optional[Dict[str, Callable[[str], float]]], default: None ) –

    mapping of {column_name: func(smi) -> value}. Picklable functions (named functions, functools.partial) use ncpu workers; non-picklable ones (lambdas, closures) are processed serially.

  • table (str, default: 'frags' ) –

    target table for custom_props'frags' or 'frags_h'.

  • ncpu (int, default: 1 ) –

    workers for built-in and picklable custom properties.

  • verbose (bool, default: False ) –

    print progress to stderr.

Source code in crem/db.py
def add_fragment_props(
    db: PathLike,
    properties=_PROPS_DEFAULT,
    *,
    custom_props: Optional[Dict[str, Callable[[str], float]]] = None,
    table: str = 'frags',
    ncpu: int = 1,
    verbose: bool = False,
) -> None:
    """Add molecular properties to a CReM fragment database.

    Only rows with ``NULL`` property values are processed, so calling this
    function after adding new fragments fills only the newly added rows.

    Built-in properties are computed on the ``frags`` table (``core_smi``
    column) using RDKit descriptors.  Custom properties can target either
    ``'frags'`` (``core_smi``) or ``'frags_h'`` (H-replaced SMILES ``smi``).

    :param db: path to the fragment database.
    :param properties: built-in property names to compute (``'mw'``, ``'logp'``,
        ``'rtb'``, ``'tpsa'``, ``'fcsp3'``). Accepted values: if **omitted**, all
        built-ins are computed when ``custom_props`` is not given and **no**
        built-ins when ``custom_props`` is given (so
        ``add_fragment_props(db, custom_props={...})`` adds only the custom
        columns, while the usual ``add_fragment_props(db)`` is unchanged);
        ``None`` or ``'all'`` forces all built-ins (combine with ``custom_props``
        to add both at once); a list/tuple computes that subset; and ``[]``
        skips built-ins entirely.
    :param custom_props: mapping of ``{column_name: func(smi) -> value}``.
        Picklable functions (named functions, ``functools.partial``) use
        ``ncpu`` workers; non-picklable ones (lambdas, closures) are processed
        serially.
    :param table: target table for ``custom_props`` — ``'frags'`` or
        ``'frags_h'``.
    :param ncpu: workers for built-in and picklable custom properties.
    :param verbose: print progress to stderr.
    """
    if table not in _TABLE_COLS:
        raise ValueError(f"table must be one of {list(_TABLE_COLS)}, got {table!r}")

    if properties is _PROPS_DEFAULT:
        compute_builtins = custom_props is None
        builtins_arg: Optional[List[str]] = None
    elif properties is None or properties == 'all':
        compute_builtins = True
        builtins_arg = None
    elif isinstance(properties, (list, tuple)) and properties:
        compute_builtins = True
        builtins_arg = list(properties)
    else:  # [] / () / explicit empty → skip built-ins
        compute_builtins = False
        builtins_arg = None

    if compute_builtins:
        from crem.scripts.cremdb_add_prop import run as _run
        _run(db_path=str(db), properties=builtins_arg, ncpu=ncpu, verbose=verbose)

    if custom_props:
        _add_custom_props(str(db), custom_props, table=table, ncpu=ncpu, verbose=verbose)