Stream Actions
Use StreamActions when a client needs to receive Action assignments and lifecycle changes in real time.
The stream filters by principals assigned to execute Actions, principals who requested Actions, or principals who authorized Actions.
StreamActions is a server-streaming RPC and is not available through standard REST/HTTP.
Use one of the SDK clients or another gRPC-capable client.
|
package main
import (
"connectrpc.com/connect"
"context"
"fmt"
"log"
wdmsdk "github.com/raft-tech/raft-wdm-sdk-go"
pb "github.com/raft-tech/raft-wdm-sdk-go/gen/raft/wdm/v1"
svc "github.com/raft-tech/raft-wdm-sdk-go/gen/raft/wdm/v1/service"
)
func main() {
cfg, err := wdmsdk.LoadConfig()
if err != nil {
log.Fatal(err)
}
client, err := wdmsdk.NewFromConfig(cfg)
if err != nil {
log.Fatal(err)
}
stream, err := client.ActionService().StreamActions(context.Background(), connect.NewRequest(&svc.StreamActionsRequest{
AssignedTo: []*pb.Principal{
{
Id: "uav-shadow-07",
Type: pb.PrincipalType_PRINCIPAL_TYPE_OBJECT,
},
},
StreamMode: svc.ActionStreamMode_ACTION_STREAM_MODE_LIVE_ONLY,
HeartbeatPeriodMillis: 30000,
}))
if err != nil {
log.Fatal(err)
}
for stream.Receive() {
event := stream.Msg()
if event.GetEventType() == svc.ActionEventType_ACTION_EVENT_TYPE_HEARTBEAT {
fmt.Println("heartbeat")
continue
}
action := event.GetAction()
fmt.Printf("event: %s action: %s %s\n", event.GetEventType(), action.GetId(), action.GetName())
}
if err := stream.Err(); err != nil {
log.Fatal(err)
}
}
import com.raft.wdm.Wdm;
import com.raft.wdm.raft.wdm.v1.Principal;
import com.raft.wdm.raft.wdm.v1.PrincipalType;
import com.raft.wdm.raft.wdm.v1.service.ActionEventType;
import com.raft.wdm.raft.wdm.v1.service.ActionStreamMode;
import com.raft.wdm.raft.wdm.v1.service.StreamActionsRequest;
import com.raft.wdm.raft.wdm.v1.service.StreamActionsResponse;
import com.raft.wdm.v1.WdmV1Client;
public final class StreamActionsExample {
public static void main(String[] args) {
var options = Wdm.toOptions(Wdm.loadConfig(null, null));
try (var client = WdmV1Client.create(options)) {
var req = StreamActionsRequest.newBuilder()
.addAssignedTo(Principal.newBuilder()
.setId("uav-shadow-07")
.setType(PrincipalType.PRINCIPAL_TYPE_OBJECT))
.setStreamMode(ActionStreamMode.ACTION_STREAM_MODE_LIVE_ONLY)
.setHeartbeatPeriodMillis(30000)
.build();
try (var stream = client.getActionServiceBlocking().streamActions(req)) {
for (StreamActionsResponse event : stream) {
if (event.getEventType() == ActionEventType.ACTION_EVENT_TYPE_HEARTBEAT) {
System.out.println("heartbeat");
continue;
}
var action = event.getAction();
System.out.printf("event: %s action: %s %s%n",
event.getEventType(), action.getId(), action.getName());
}
}
}
}
}
import asyncio
from raft.wdm.v1 import common_pb2
from raft.wdm.v1.service import action_service_pb2
import raft_wdm_sdk
async def main() -> None:
async with raft_wdm_sdk.Client.from_config(raft_wdm_sdk.load_config()) as client:
stream = await client.action_service.stream_actions(
action_service_pb2.StreamActionsRequest(
assigned_to=[
common_pb2.Principal(
id="uav-shadow-07",
type=common_pb2.PRINCIPAL_TYPE_OBJECT,
),
],
stream_mode=action_service_pb2.ACTION_STREAM_MODE_LIVE_ONLY,
heartbeat_period_millis=30000,
),
)
async for event in stream:
if event.event_type == action_service_pb2.ACTION_EVENT_TYPE_HEARTBEAT:
print("heartbeat")
continue
print(f"event: {event.event_type} action: {event.action.id} {event.action.name}")
asyncio.run(main())
import { create } from "@bufbuild/protobuf";
import { createClient, fromNodeConfig, loadConfig } from "@raft-tech/raft-wdm-sdk-typescript";
import { PrincipalType } from "@raft-tech/raft-wdm-sdk-typescript/gen/raft/wdm/v1/common_pb.js";
import {
ActionEventType,
ActionStreamMode,
StreamActionsRequestSchema,
} from "@raft-tech/raft-wdm-sdk-typescript/gen/raft/wdm/v1/service/action_service_pb.js";
const client = createClient(...fromNodeConfig(loadConfig()));
const stream = client.actionService.streamActions(
create(StreamActionsRequestSchema, {
assignedTo: [
{
id: "uav-shadow-07",
type: PrincipalType.OBJECT,
},
],
streamMode: ActionStreamMode.LIVE_ONLY,
heartbeatPeriodMillis: 30000,
}),
);
for await (const event of stream) {
if (event.eventType === ActionEventType.HEARTBEAT) {
console.log("heartbeat");
continue;
}
console.log(
`event: ${ActionEventType[event.eventType]} action: ${event.action?.id} ${event.action?.name}`,
);
}
Request Shape
Principal filters combine with OR logic.
An Action appears if it matches any requested, assigned, or authorized principal filter.
Set streamMode to choose whether the stream includes initial state, live updates, or both.
{
"assignedTo": [
{
"id": "uav-shadow-07",
"type": "PRINCIPAL_TYPE_OBJECT"
}
],
"streamMode": "ACTION_STREAM_MODE_LIVE_ONLY",
"heartbeatPeriodMillis": 30000
}
The full contract is defined in the WDM protobuf spec.
In the WDM distribution, see raft/wdm/v1/service/action_service.proto for StreamActionsRequest, StreamActionsResponse, stream modes, and event types.
Stream Behavior
-
ACTION_STREAM_MODE_UNSPECIFIEDsends initial non-terminal matching Actions and then continues with live updates. -
ACTION_STREAM_MODE_INITIAL_ONLYsends the initial matching queue and closes the stream. -
ACTION_STREAM_MODE_LIVE_ONLYskips initial state and emits only changes that happen after subscription.
Each message the client receives is a StreamActionsResponse.
That response carries an eventType and, except for heartbeat messages, the Action associated with the event.
The examples above read eventType inside the stream loop to decide whether to handle an Action update or ignore a heartbeat.
{
"eventType": "ACTION_EVENT_TYPE_UPDATED",
"timestamp": "2026-05-27T21:31:04Z",
"action": {
"id": "isr-collect-alpha",
"name": "ISR Collection Alpha",
"type": "isr_collection",
"scope": "ACTION_SCOPE_TASK",
"state": "ACTION_STATE_IN_PROGRESS",
"priority": "ACTION_PRIORITY_HIGH",
"progress": {
"phase": "ON_STATION",
"statusMessage": "Asset on station, commencing collection orbit"
},
"assignedTo": {
"id": "uav-shadow-07",
"type": "PRINCIPAL_TYPE_OBJECT"
},
"provenance": {
"name": "uav-shadow-07",
"updatedAt": "2026-05-27T21:31:04Z"
}
}
}
-
ACTION_EVENT_TYPE_INITIALmeans the Action existed when the stream was established. -
ACTION_EVENT_TYPE_CREATEDmeans an Action was newly created or assigned. -
ACTION_EVENT_TYPE_UPDATEDmeans state or progress changed. -
ACTION_EVENT_TYPE_CANCELLEDmeans the Action was cancelled. -
ACTION_EVENT_TYPE_HEARTBEATis a keepalive message when heartbeat is configured.
Principals that receive Actions on this stream should use Update State to report acknowledgement, planning, execution progress, completion, failure, rejection, or cancellation.