Writing Extensions: Sources, Sinks, and Kernels
How to extend runex's capability/channel edge without editing the core. This is the authoring guide for Python-side extensions:
- inbound channels:
DataSourceSpec - outbound channels:
SinkSpec - effect helpers:
KERNELS
If you are modeling business rules in .scm, read ontology-authoring.md instead. This doc is for the Python code that sits outside the pure ontology and bridges to the outside world.
TL;DR
Drop Python files into:
text
~/.runex/extensions/
kernels/
my_kernels.py # KERNELS = {"name": fn, ...}
sources/
my_source.py # SPEC = DataSourceSpec(...)
sinks/
my_sink.py # SPEC = SinkSpec(...)Then open runex normally:
python
from runex.ontology import Ontology
o = Ontology.open("data/data.db") # discovers kernels and channel specs
man = o.manifest()What becomes visible:
manifest().data.kernelsmanifest().data.datasourcesmanifest().data.sinks
What does not happen automatically:
- a
SinkSpecis only discoverable atopen()time - the configured sink instance still must be registered on the engine with
o.register_sink(...)
Mental model
runex's core owns capabilities, not channels.
- A capability is a generic power such as file I/O, HTTP, sqlite, or subprocess. The core sanctions and injects it.
- A channel is one concrete external system built on top of those powers: an Obsidian vault, a WeChat export, a NocoDB table, a future Notion workspace.
The split is strict:
- channel modules declare
requires=("file", "http", ...) - the registry injects capability handles into the channel's
_factory - channel code performs zero direct I/O outside those handles
That is the seam enforced by tests/test_capability_seam.py.
Trust boundary
Extension discovery executes Python from ~/.runex/extensions/{kernels,sources,sinks}.
That is safe only under the current single-user-local trust model: the extension directory must be owned by the same principal as the .db. Do not point a shared or hosted deployment at an extension directory you do not control.
See architecture.md and roadmap.md for the architectural rationale.
Discovery contract
The loader scans *.py files in each bucket, skipping files whose names start with _.
kernels/*.pymust exposeKERNELS: dict[str, Callable]sources/*.pymust exposeSPEC: DataSourceSpecsinks/*.pymust exposeSPEC: SinkSpec
Import errors are not swallowed. A broken extension fails fast and the author fixes it.
The implementation lives in src/runex/extensions.py.
Source extensions
What a source does
A source turns some external raw input into one or more typed CanonicalItems.
The source module exports a single SPEC:
python
from __future__ import annotations
from pathlib import Path
from runex.adapters.base import CanonicalItem
from runex.adapters.registry import DataSourceSpec, ParamSpec, WatchSpec
class DemoSource:
name = "demo-source"
def __init__(self, *, fs):
self._fs = fs
def discover(self, source: str | Path):
root = Path(source)
for path in self._fs.glob(root, "*.txt"):
text = self._fs.read_text(path, encoding="utf-8")
yield CanonicalItem(
source_uri=f"demo://{path.name}",
source_type="demo",
supertag="Note",
name=path.stem,
fields={
"标题": ("text", path.stem),
"正文": ("longtext", text),
},
)
SPEC = DataSourceSpec(
key="demo-source",
summary="demo text files -> Note nodes",
supertag="Note",
ontologies=("note_flow.scm",),
params=(
ParamSpec("path", "path", True, "directory to ingest"),
),
watch=WatchSpec(kind="file", glob="*.txt"),
requires=("file",),
_factory=lambda ctx, caps: DemoSource(fs=caps["file"]),
)DataSourceSpec fields
key: stable source identifier used by CLI / manifest / productssummary: short human-readable descriptionsupertag: primary object type this source emitsontologies:.scmbundles that make the source's output liveparams: declarative input contract surfaced inmanifest()watch: watchability contract for incremental ingestrequires: capability keys to inject_factory(ctx, caps): builds the real adapter instance
Rules for source authors
- Keep module import cheap. Heavy work belongs in
discover(), not at import time. - The adapter should only do
raw -> CanonicalItem. - Do not import
requests,sqlite3,subprocess, or similar libraries directly in a split channel. Use injected capabilities. - If the source is watchable, declare it honestly in
WatchSpec. - If the source needs ontology bundles to behave correctly, list them in
ontologiesso a product shell can auto-load them.
Sink extensions
What a sink does
A sink is the outbound mirror of a source:
text
Store node -> CanonicalItem projection -> SinkAdapter.writeback(item)The sink module exports SPEC: SinkSpec:
python
from __future__ import annotations
from runex.adapters.registry import ParamSpec, SinkSpec
class DemoSink:
name = "demo-sink"
def __init__(self, *, http, endpoint: str, token: str):
self._http = http
self.endpoint = endpoint
self.token = token
def writeback(self, item):
body = {
"fields": {
name: value
for name, (_vtype, value) in item.fields.items()
if not name.startswith("_")
}
}
return self._http.post_json(
self.endpoint,
body,
headers={"Authorization": f"Bearer {self.token}"},
)
SPEC = SinkSpec(
key="demo-sink",
summary="store node -> remote record POST",
requires=("http",),
params=(
ParamSpec("endpoint", "str", True, "HTTP endpoint"),
ParamSpec("token", "str", True, "API token"),
),
_factory=lambda ctx, caps: DemoSink(
http=caps["http"],
endpoint=ctx["endpoint"],
token=ctx["token"],
),
)SinkSpec fields
key: the name an ECA action uses in(writeback "key")summary: short description surfaced viamanifest()requires: capability keys to injectparams: the config inputs needed to build the sink_factory(ctx, caps): builds the configured sink instance
Discoverable spec vs configured instance
This distinction matters:
- discovery gives runex a spec
writebackat runtime needs a configured sink object
Example:
python
from runex.adapters import registry as ds
from runex.ontology import Ontology
o = Ontology.open("data.db")
spec = ds.sink_get("demo-sink")
sink = spec.build({"endpoint": "https://api.example.com/x", "token": "T"})
o.register_sink("demo-sink", sink)Until register_sink(...) happens, (writeback "demo-sink") will fail with sink not registered.
Kernel extensions
Kernels are named Python helpers callable from DSL effects:
python
from __future__ import annotations
import re
def extract_bilibili_url(text: str) -> str | None:
m = re.search(r"https?://(?:www\.)?bilibili\.com/video/\S+", text or "")
return m.group(0) if m else None
KERNELS = {
"extract-bilibili-url": extract_bilibili_url,
}Use a kernel when:
- the operation is effect-side helper logic
- the business
.scmshould name the capability explicitly - the logic does not belong in a generic built-in kernel
Do not use kernels for:
- model declarations (
.scmis the right place) - direct data plumbing between Python modules that should stay typed
- anything that should instead be a sanctioned capability
Remember:
- kernels are callable only via
(call-kernel ...) - guards cannot call them
- products should register any externally dirty kernels deliberately
Blocking kernels and the Signal-Then-Work pattern
The problem
The reactive bus is synchronous. When a store mutation fires an action, the effect DSL runs on the same thread as the original store.set_field call. A kernel that does heavy work — yt-dlp subprocess, SiliconFlow ASR, network inference — blocks that thread for the entire duration. In a Tray App or any interactive host, this freezes the UI for minutes.
How runex surfaces this
When you register a kernel that does heavy I/O, mark it blocking=True:
python
ontology.register_kernel("transcribe-url", transcribe_fn, blocking=True)This flag appears in manifest().data.kernels and list_kernels():
json
{"name": "transcribe-url", "blocking": true}runex ontology check runs a static AST walk over every action's effect. If it finds (call-kernel "transcribe-url" ...) in an effect body, it emits:
WARNING thought_transcribe calls blocking kernel transcribe-url directly
in its effect — use Signal-Then-Work pattern--strict turns this warning into a non-zero exit, making it a CI gate.
The correct pattern: Signal-Then-Work
Never call a blocking kernel directly inside an action effect. Instead, split the work into three phases:
Phase 1 — Signal (action effect, fast and atomic)
The action's only job is to record intent. It writes a status field and transitions state. This finishes in milliseconds.
scheme
; thought_detect_url.scm
(define-action thought_detect_url
:machine Thought
:from (new)
:trigger (field-set "内容")
:guard (and (field-nonempty? "内容")
(call-kernel "extract-url" (field "内容")))
:effect (begin
(set-field "视频来源" "text" (call-kernel "extract-url" (field "内容")))
(set-field "转录状态" "text" "pending")
(transition "pending_transcription")))Phase 2 — Work (external daemon, not an action)
A separate long-lived process polls for nodes in 转录状态 = pending, runs the blocking kernel, and writes the result back. This process is not part of the ontology engine — it is an external worker that uses the store API directly.
python
# daemon.py — runs independently, not inside the reactive bus
import time
from runex.ontology import Ontology
o = Ontology.open("data.db")
while True:
pending = o.store.find_by_field("转录状态", "pending")
for node_id in pending:
transcript = transcribe_url(o.store.get_field(node_id, "视频来源"))
o.store.set_field(node_id, "视频转录", "longtext", transcript)
o.store.set_field(node_id, "转录状态", "text", "done")
time.sleep(5)Phase 3 — React (action effect, fast, triggers on the written result)
A second action fires reactively when 视频转录 is set. Its effect runs a fast operation (LLM call ~10 s is acceptable; multi-minute subprocess is not). Crucially, it uses upsert-by-identity not create-node, making it idempotent on repeated triggers.
scheme
; thought_analyze_transcript.scm
(define-action thought_analyze_transcript
:machine Thought
:from (pending_transcription)
:trigger (field-set "视频转录")
:guard (field-nonempty? "视频转录")
:effect (begin
(set-field "AI洞察" "longtext"
(call-kernel "analyze-relevance"
(field "视频转录")
(field "内容")))
(transition "done")))Why this is the right model
This maps directly to well-established concurrency patterns:
| Pattern | Phase 1 | Phase 2 | Phase 3 |
|---|---|---|---|
| Erlang GenServer + spawn_link | handle_call returns {reply, ok} | spawned process does work | sends result back to GenServer |
| Go channels | write to buffered channel | worker goroutine drains | writes back, select reads |
| Clojure STM + agent | alter ref immediately | agent does async work | swap! result in |
The atomic boundary (Phase 1 action effect) stays fast and pure. Long-running work lives outside the bus entirely. The bus reacts when the work is done.
Idempotency is your responsibility in Phase 3
If Phase 3 uses create-node, a daemon retry or a duplicate bus wakeup will create duplicate nodes. Use upsert-by-identity with a stable natural key, or design the machine so the state transition in Phase 1 prevents re-entry. runex ontology check cannot detect idempotency violations — this is an authoring discipline, not a lint rule.
Capability injection
Capability keys are declared in requires=(...).
Today the sanctioned keys are:
filecommandsqlitehttp
They are constructed by src/runex/adapters/capabilities.py.
Authoring rule:
- if a channel needs a power the registry does not sanction, adding that power is a core design decision, not an extension hack
Do not smuggle extra I/O through ad hoc imports.
Testing strategy
Use three layers, in this order:
1. Unit test the extension module
- source: raw input -> expected
CanonicalItem - sink: projected fields -> expected request body
- kernel: function args -> expected return value
2. Registry/discovery test
Point discovery at a temp dir and assert the spec appears:
discover_sources(base=tmp_path / "ext")discover_sinks(base=tmp_path / "ext")discover_kernels(engine, base=tmp_path / "ext")
See tests/test_extension_discovery.py.
3. End-to-end ontology test
Wire the extension into a real Ontology and assert the business cascade:
- ingest path for a source
- reactive state machine/action chain
writebackpath for a sink
Prefer a stub-at-the-seam test for routine CI. Use live dogfood sparingly and keep it minimal and reversible.
Common mistakes
Importing I/O libraries directly in a split channel
Bad:
python
import requestsGood:
python
SPEC = SinkSpec(..., requires=("http",), ...)and use caps["http"] / self._http.
Doing heavy work at import time
Bad:
- network request at module import
- sqlite connection at module import
- scanning a whole filesystem tree at module import
Good:
- module import defines
SPEC _factory(...)builds the adapterdiscover()/writeback()does the real work
Treating discovery as registration
SinkSpec discovery makes the sink visible in manifest(). It does not automatically register a configured sink instance for runtime writeback.
Leaking internal fields outbound
When projecting to external systems, skip _-prefixed internal fields unless you deliberately want them.
How this connects to the next e2e environment
If you are building a repository-local end-to-end environment:
- put synthetic source/sink fixtures under a temp extension dir
- keep the ontology bundles in
config/ontology/ - run the whole stack through
Ontology.open()so discovery is exercised - prefer a local stub service or disposable table for external writes
That setup validates the same contract a real extension will use in production, without teaching the tests special rules.