ROS2 Governance¶
The AutonomyOps ADK wraps every ROS2 operation in a governance layer that evaluates actions against an OPA policy before spawning any process.
First run (Community Edition)¶
The Community Edition distribution of the autonomy CLI dispatches ROS2
governance via autonomy run ros2.launch. (autonomy-ce-<os>-<arch> is
the release artifact prefix; once installed the binary is on PATH as
autonomy.) No orchestrator, fleet, or paid-tier deployment is required:
autonomy run \
--image ghcr.io/autonomyops/adk-ros2-runtime:local \
ros2.launch launch demo_robot arm_demo.launch.py
End-to-end walkthrough with image build, PASS markers, and bundle inspection: Robotics Quickstart.
Architecture overview¶
Both the CE dispatch path (autonomy run ros2.launch) and the paid-tier
subcommand (autonomy ros2 run) feed the same dual-path resolver and the
same pre-exec policy gate:
autonomy run ─┐
ros2.launch │
├─▶ runtime/dualpath.Resolve()
autonomy ros2 run ─┘ │
(paid tier) ▼
ModeContainer ──▶ docker run --rm <image> ros2 …
ModeNative ──▶ ros2 … (native subprocess)
│
pre-exec policy gate
(runtime.Evaluator.Eval)
│
┌───────┴──────────┐
Allow Deny
│ │
subprocess ErrPolicyDenied
starts returned, nothing
spawned
Dual-path execution¶
The dual-path resolver (runtime/dualpath) probes the host environment at
call time and returns one of two modes:
Mode |
Condition |
Security posture |
|---|---|---|
|
Docker daemon reachable |
Full isolation; active policy interception via governance sidecar |
|
Docker absent, |
Reduced governance; subprocess launched without container isolation |
When ModeNative is selected a REDUCED-GOVERNANCE warning is printed to
stderr:
[WARN] ros2: REDUCED-GOVERNANCE native mode — Docker unavailable; container
isolation and full policy interception are bypassed. …
Container path is required for production deployments.
ModeContainer requires --image to be set to a ROS2-capable image. The
adk-runtime base image does not bundle ROS2 toolchain binaries.
Pre-execution policy gate¶
Before any subprocess is spawned RunGoverned calls
opts.Evaluator.Eval(ctx, action) where action is derived from the args:
args[0] → action kind
args[1], args[2] → params (package, launch_file for ros2.launch)
A Deny decision returns ros2.ErrPolicyDenied immediately and no process is
started.
Default evaluator¶
ros2.DefaultEvaluator() enforces:
Action kind |
Decision |
|---|---|
|
Allow |
|
Allow |
|
Allow |
|
Deny (hard-coded, cannot be overridden) |
anything else |
Deny |
Use the default evaluator for development and CI demos.
Launch evaluator¶
ros2.NewLaunchEvaluator(opts) adds package-level and topic-level
allowlisting on top of the default rules. Packages are governed by
PackageRule (per-package allow flag + launch-file allowlist); topics are
governed by an ordered []TopicRule list (first match wins):
eval := ros2.NewLaunchEvaluator(ros2.LaunchPolicyOptions{
PackageRules: map[string]ros2.PackageRule{
"demo_robot": {
Allowed: true,
AllowedLaunchFiles: []string{"arm_demo.launch.py"},
},
"my_robot": {
Allowed: true, // all launch files permitted
},
},
TopicRules: []ros2.TopicRule{
{Pattern: "/arm/**", Allowed: true},
{Pattern: "/my_robot/**", Allowed: true},
},
})
When PackageRules is non-empty, any package not listed is denied
(fail-closed). Similarly when TopicRules is non-empty, a topic that matches
no rule is denied. A publish action for /unregistered/topic is denied even
if tool.ros2.* is otherwise allowed.
OPA policy layer¶
The bundle policy (a Rego file) is evaluated by the OPA engine inside the
governance sidecar. The demo policy (demo/bundles/ros2/policies/ros2_safety.rego)
is representative of a production policy:
package autonomy
import rego.v1
default allow := false
allow if {
input.kind == "tool.ros2.launch"
input.params.package == "demo_robot"
input.params.launch_file == "arm_demo.launch.py"
}
allow if {
input.kind == "tool.ros2.topic.subscribe"
startswith(input.params.topic, "/arm/")
}
allow if {
input.kind == "tool.ros2.topic.publish"
input.params.topic == "/arm/cmd_vel"
}
# … more rules …
# tool.shell is never allowed — explicit guard.
allow if { false } # tool.shell never matches any allow rule
All policies are fail-closed: default allow := false means an unrecognised
action kind is denied, not allowed.
Running the OPA policy tests¶
The demo policy ships with a Go-based test suite:
go test ./policy/... -run TestRos2DemoPolicy -v
If you have the OPA CLI installed, you can run tests directly:
opa test demo/bundles/ros2/policies/ -v
Writing a policy for your robot¶
Copy the template and fill in the CONFIGURE sections:
cp demo/bundles/ros2/policies/ros2_policy_template.rego \
demo/bundles/my_robot/policies/my_robot_safety.rego
Minimum changes:
Replace
demo_robotwith your package name.Replace
/arm/with your topic namespace.List your allowed service endpoints.
Run
opa testto verify no regressions.
RunGoverned — programmatic API (advanced integration)¶
The CLI commands above are the supported path for first use. Teams embedding
ROS2 governance into their own Go services can call runtime/ros2.RunGoverned
directly — the package is part of the open-source adk repository and is the
same code the CLI dispatches to. Use this path when the CLI’s flag surface
isn’t enough (custom evaluators, in-process supervision, integration tests).
import "github.com/autonomyops/adk/runtime/ros2"
opts := ros2.RunOptions{
Image: "ghcr.io/autonomyops/adk-ros2-runtime:v1.0.0",
Policy: "policies/arm_safety.rego",
RuntimeURL: "http://localhost:8080",
Evaluator: ros2.DefaultEvaluator(),
}
// RunGoverned signature: RunGoverned(ctx context.Context, args []string, opts RunOptions) error
args := []string{"launch", "demo_robot", "arm_demo.launch.py"}
if err := ros2.RunGoverned(ctx, args, opts); err != nil {
switch {
case errors.Is(err, ros2.ErrPolicyDenied):
log.Fatalf("policy denied: %v", err)
case errors.Is(err, ros2.ErrImageRequired):
log.Fatalf("image required for container path")
case errors.Is(err, ros2.ErrNeitherAvailable):
log.Fatalf("neither Docker nor ros2 available")
}
}
Governed node lifecycle¶
ros2.Lifecycle manages governed start/stop transitions for individual nodes:
lc := ros2.NewLifecycle(interceptor)
if err := lc.Start(ctx, "arm_controller"); err != nil {
// policy denied or interceptor error
}
// … later …
if err := lc.Stop(ctx, "arm_controller"); err != nil {
// …
}
Each Start / Stop call is evaluated by the interceptor with
lifecycle.start / lifecycle.stop before any subprocess action is taken. A
Deny result returns an error without starting or stopping any process.
See also¶
ROS2 Markers and Observability — PASS markers and WAL
Bundle Workflows — pull, inspect, stage, activate
Hardware Adaptation — adapting to production hardware
runtime/ros2/— Go package source