Skip to content

Writing Extensions: Sources, Sinks, and Kernels

How to extend runex's capability/channel edge without editing the core. This is the authoring guide for Python-side extensions:

  • inbound channels: DataSourceSpec
  • outbound channels: SinkSpec
  • effect helpers: KERNELS

If you are modeling business rules in .scm, read ontology-authoring.md instead. This doc is for the Python code that sits outside the pure ontology and bridges to the outside world.


TL;DR

Drop Python files into:

text
~/.runex/extensions/
  kernels/
    my_kernels.py      # KERNELS = {"name": fn, ...}
  sources/
    my_source.py       # SPEC = DataSourceSpec(...)
  sinks/
    my_sink.py         # SPEC = SinkSpec(...)

Then open runex normally:

python
from runex.ontology import Ontology

o = Ontology.open("data/data.db")   # discovers kernels and channel specs
man = o.manifest()

What becomes visible:

  • manifest().data.kernels
  • manifest().data.datasources
  • manifest().data.sinks

What does not happen automatically:

  • a SinkSpec is only discoverable at open() time
  • the configured sink instance still must be registered on the engine with o.register_sink(...)

Mental model

runex's core owns capabilities, not channels.

  • A capability is a generic power such as file I/O, HTTP, sqlite, or subprocess. The core sanctions and injects it.
  • A channel is one concrete external system built on top of those powers: an Obsidian vault, a WeChat export, a NocoDB table, a future Notion workspace.

The split is strict:

  • channel modules declare requires=("file", "http", ...)
  • the registry injects capability handles into the channel's _factory
  • channel code performs zero direct I/O outside those handles

That is the seam enforced by tests/test_capability_seam.py.


Trust boundary

Extension discovery executes Python from ~/.runex/extensions/{kernels,sources,sinks}.

That is safe only under the current single-user-local trust model: the extension directory must be owned by the same principal as the .db. Do not point a shared or hosted deployment at an extension directory you do not control.

See architecture.md and roadmap.md for the architectural rationale.


Discovery contract

The loader scans *.py files in each bucket, skipping files whose names start with _.

  • kernels/*.py must expose KERNELS: dict[str, Callable]
  • sources/*.py must expose SPEC: DataSourceSpec
  • sinks/*.py must expose SPEC: SinkSpec

Import errors are not swallowed. A broken extension fails fast and the author fixes it.

The implementation lives in src/runex/extensions.py.


Source extensions

What a source does

A source turns some external raw input into one or more typed CanonicalItems.

The source module exports a single SPEC:

python
from __future__ import annotations

from pathlib import Path

from runex.adapters.base import CanonicalItem
from runex.adapters.registry import DataSourceSpec, ParamSpec, WatchSpec


class DemoSource:
    name = "demo-source"

    def __init__(self, *, fs):
        self._fs = fs

    def discover(self, source: str | Path):
        root = Path(source)
        for path in self._fs.glob(root, "*.txt"):
            text = self._fs.read_text(path, encoding="utf-8")
            yield CanonicalItem(
                source_uri=f"demo://{path.name}",
                source_type="demo",
                supertag="Note",
                name=path.stem,
                fields={
                    "标题": ("text", path.stem),
                    "正文": ("longtext", text),
                },
            )


SPEC = DataSourceSpec(
    key="demo-source",
    summary="demo text files -> Note nodes",
    supertag="Note",
    ontologies=("note_flow.scm",),
    params=(
        ParamSpec("path", "path", True, "directory to ingest"),
    ),
    watch=WatchSpec(kind="file", glob="*.txt"),
    requires=("file",),
    _factory=lambda ctx, caps: DemoSource(fs=caps["file"]),
)

DataSourceSpec fields

  • key: stable source identifier used by CLI / manifest / products
  • summary: short human-readable description
  • supertag: primary object type this source emits
  • ontologies: .scm bundles that make the source's output live
  • params: declarative input contract surfaced in manifest()
  • watch: watchability contract for incremental ingest
  • requires: capability keys to inject
  • _factory(ctx, caps): builds the real adapter instance

Rules for source authors

  • Keep module import cheap. Heavy work belongs in discover(), not at import time.
  • The adapter should only do raw -> CanonicalItem.
  • Do not import requests, sqlite3, subprocess, or similar libraries directly in a split channel. Use injected capabilities.
  • If the source is watchable, declare it honestly in WatchSpec.
  • If the source needs ontology bundles to behave correctly, list them in ontologies so a product shell can auto-load them.

Sink extensions

What a sink does

A sink is the outbound mirror of a source:

text
Store node -> CanonicalItem projection -> SinkAdapter.writeback(item)

The sink module exports SPEC: SinkSpec:

python
from __future__ import annotations

from runex.adapters.registry import ParamSpec, SinkSpec


class DemoSink:
    name = "demo-sink"

    def __init__(self, *, http, endpoint: str, token: str):
        self._http = http
        self.endpoint = endpoint
        self.token = token

    def writeback(self, item):
        body = {
            "fields": {
                name: value
                for name, (_vtype, value) in item.fields.items()
                if not name.startswith("_")
            }
        }
        return self._http.post_json(
            self.endpoint,
            body,
            headers={"Authorization": f"Bearer {self.token}"},
        )


SPEC = SinkSpec(
    key="demo-sink",
    summary="store node -> remote record POST",
    requires=("http",),
    params=(
        ParamSpec("endpoint", "str", True, "HTTP endpoint"),
        ParamSpec("token", "str", True, "API token"),
    ),
    _factory=lambda ctx, caps: DemoSink(
        http=caps["http"],
        endpoint=ctx["endpoint"],
        token=ctx["token"],
    ),
)

SinkSpec fields

  • key: the name an ECA action uses in (writeback "key")
  • summary: short description surfaced via manifest()
  • requires: capability keys to inject
  • params: the config inputs needed to build the sink
  • _factory(ctx, caps): builds the configured sink instance

Discoverable spec vs configured instance

This distinction matters:

  • discovery gives runex a spec
  • writeback at runtime needs a configured sink object

Example:

python
from runex.adapters import registry as ds
from runex.ontology import Ontology

o = Ontology.open("data.db")
spec = ds.sink_get("demo-sink")
sink = spec.build({"endpoint": "https://api.example.com/x", "token": "T"})
o.register_sink("demo-sink", sink)

Until register_sink(...) happens, (writeback "demo-sink") will fail with sink not registered.


Kernel extensions

Kernels are named Python helpers callable from DSL effects:

python
from __future__ import annotations

import re


def extract_bilibili_url(text: str) -> str | None:
    m = re.search(r"https?://(?:www\.)?bilibili\.com/video/\S+", text or "")
    return m.group(0) if m else None


KERNELS = {
    "extract-bilibili-url": extract_bilibili_url,
}

Use a kernel when:

  • the operation is effect-side helper logic
  • the business .scm should name the capability explicitly
  • the logic does not belong in a generic built-in kernel

Do not use kernels for:

  • model declarations (.scm is the right place)
  • direct data plumbing between Python modules that should stay typed
  • anything that should instead be a sanctioned capability

Remember:

  • kernels are callable only via (call-kernel ...)
  • guards cannot call them
  • products should register any externally dirty kernels deliberately

Blocking kernels and the Signal-Then-Work pattern

The problem

The reactive bus is synchronous. When a store mutation fires an action, the effect DSL runs on the same thread as the original store.set_field call. A kernel that does heavy work — yt-dlp subprocess, SiliconFlow ASR, network inference — blocks that thread for the entire duration. In a Tray App or any interactive host, this freezes the UI for minutes.

How runex surfaces this

When you register a kernel that does heavy I/O, mark it blocking=True:

python
ontology.register_kernel("transcribe-url", transcribe_fn, blocking=True)

This flag appears in manifest().data.kernels and list_kernels():

json
{"name": "transcribe-url", "blocking": true}

runex ontology check runs a static AST walk over every action's effect. If it finds (call-kernel "transcribe-url" ...) in an effect body, it emits:

WARNING  thought_transcribe  calls blocking kernel  transcribe-url  directly
         in its effect — use Signal-Then-Work pattern

--strict turns this warning into a non-zero exit, making it a CI gate.

The correct pattern: Signal-Then-Work

Never call a blocking kernel directly inside an action effect. Instead, split the work into three phases:

Phase 1 — Signal (action effect, fast and atomic)

The action's only job is to record intent. It writes a status field and transitions state. This finishes in milliseconds.

scheme
; thought_detect_url.scm
(define-action thought_detect_url
  :machine Thought
  :from    (new)
  :trigger (field-set "内容")
  :guard   (and (field-nonempty? "内容")
                (call-kernel "extract-url" (field "内容")))
  :effect  (begin
              (set-field "视频来源"   "text" (call-kernel "extract-url" (field "内容")))
              (set-field "转录状态"   "text" "pending")
              (transition "pending_transcription")))

Phase 2 — Work (external daemon, not an action)

A separate long-lived process polls for nodes in 转录状态 = pending, runs the blocking kernel, and writes the result back. This process is not part of the ontology engine — it is an external worker that uses the store API directly.

python
# daemon.py — runs independently, not inside the reactive bus
import time
from runex.ontology import Ontology

o = Ontology.open("data.db")

while True:
    pending = o.store.find_by_field("转录状态", "pending")
    for node_id in pending:
        transcript = transcribe_url(o.store.get_field(node_id, "视频来源"))
        o.store.set_field(node_id, "视频转录", "longtext", transcript)
        o.store.set_field(node_id, "转录状态", "text", "done")
    time.sleep(5)

Phase 3 — React (action effect, fast, triggers on the written result)

A second action fires reactively when 视频转录 is set. Its effect runs a fast operation (LLM call ~10 s is acceptable; multi-minute subprocess is not). Crucially, it uses upsert-by-identity not create-node, making it idempotent on repeated triggers.

scheme
; thought_analyze_transcript.scm
(define-action thought_analyze_transcript
  :machine Thought
  :from    (pending_transcription)
  :trigger (field-set "视频转录")
  :guard   (field-nonempty? "视频转录")
  :effect  (begin
              (set-field "AI洞察" "longtext"
                (call-kernel "analyze-relevance"
                  (field "视频转录")
                  (field "内容")))
              (transition "done")))

Why this is the right model

This maps directly to well-established concurrency patterns:

PatternPhase 1Phase 2Phase 3
Erlang GenServer + spawn_linkhandle_call returns {reply, ok}spawned process does worksends result back to GenServer
Go channelswrite to buffered channelworker goroutine drainswrites back, select reads
Clojure STM + agentalter ref immediatelyagent does async workswap! result in

The atomic boundary (Phase 1 action effect) stays fast and pure. Long-running work lives outside the bus entirely. The bus reacts when the work is done.

Idempotency is your responsibility in Phase 3

If Phase 3 uses create-node, a daemon retry or a duplicate bus wakeup will create duplicate nodes. Use upsert-by-identity with a stable natural key, or design the machine so the state transition in Phase 1 prevents re-entry. runex ontology check cannot detect idempotency violations — this is an authoring discipline, not a lint rule.


Capability injection

Capability keys are declared in requires=(...).

Today the sanctioned keys are:

  • file
  • command
  • sqlite
  • http

They are constructed by src/runex/adapters/capabilities.py.

Authoring rule:

  • if a channel needs a power the registry does not sanction, adding that power is a core design decision, not an extension hack

Do not smuggle extra I/O through ad hoc imports.


Testing strategy

Use three layers, in this order:

1. Unit test the extension module

  • source: raw input -> expected CanonicalItem
  • sink: projected fields -> expected request body
  • kernel: function args -> expected return value

2. Registry/discovery test

Point discovery at a temp dir and assert the spec appears:

  • discover_sources(base=tmp_path / "ext")
  • discover_sinks(base=tmp_path / "ext")
  • discover_kernels(engine, base=tmp_path / "ext")

See tests/test_extension_discovery.py.

3. End-to-end ontology test

Wire the extension into a real Ontology and assert the business cascade:

  • ingest path for a source
  • reactive state machine/action chain
  • writeback path for a sink

Prefer a stub-at-the-seam test for routine CI. Use live dogfood sparingly and keep it minimal and reversible.


Common mistakes

Importing I/O libraries directly in a split channel

Bad:

python
import requests

Good:

python
SPEC = SinkSpec(..., requires=("http",), ...)

and use caps["http"] / self._http.

Doing heavy work at import time

Bad:

  • network request at module import
  • sqlite connection at module import
  • scanning a whole filesystem tree at module import

Good:

  • module import defines SPEC
  • _factory(...) builds the adapter
  • discover() / writeback() does the real work

Treating discovery as registration

SinkSpec discovery makes the sink visible in manifest(). It does not automatically register a configured sink instance for runtime writeback.

Leaking internal fields outbound

When projecting to external systems, skip _-prefixed internal fields unless you deliberately want them.


How this connects to the next e2e environment

If you are building a repository-local end-to-end environment:

  • put synthetic source/sink fixtures under a temp extension dir
  • keep the ontology bundles in config/ontology/
  • run the whole stack through Ontology.open() so discovery is exercised
  • prefer a local stub service or disposable table for external writes

That setup validates the same contract a real extension will use in production, without teaching the tests special rules.