Stream Actions

Use StreamActions when a client needs to receive Action assignments and lifecycle changes in real time. The stream filters by principals assigned to execute Actions, principals who requested Actions, or principals who authorized Actions.

StreamActions is a server-streaming RPC and is not available through standard REST/HTTP. Use one of the SDK clients or another gRPC-capable client.
package main

import (
	"connectrpc.com/connect"
	"context"
	"fmt"
	"log"

	wdmsdk "github.com/raft-tech/raft-wdm-sdk-go"
	pb "github.com/raft-tech/raft-wdm-sdk-go/gen/raft/wdm/v1"
	svc "github.com/raft-tech/raft-wdm-sdk-go/gen/raft/wdm/v1/service"
)

func main() {
	cfg, err := wdmsdk.LoadConfig()
	if err != nil {
		log.Fatal(err)
	}

	client, err := wdmsdk.NewFromConfig(cfg)
	if err != nil {
		log.Fatal(err)
	}

	stream, err := client.ActionService().StreamActions(context.Background(), connect.NewRequest(&svc.StreamActionsRequest{
		AssignedTo: []*pb.Principal{
			{
				Id:   "uav-shadow-07",
				Type: pb.PrincipalType_PRINCIPAL_TYPE_OBJECT,
			},
		},
		StreamMode:            svc.ActionStreamMode_ACTION_STREAM_MODE_LIVE_ONLY,
		HeartbeatPeriodMillis: 30000,
	}))
	if err != nil {
		log.Fatal(err)
	}

	for stream.Receive() {
		event := stream.Msg()
		if event.GetEventType() == svc.ActionEventType_ACTION_EVENT_TYPE_HEARTBEAT {
			fmt.Println("heartbeat")
			continue
		}

		action := event.GetAction()
		fmt.Printf("event: %s action: %s %s\n", event.GetEventType(), action.GetId(), action.GetName())
	}
	if err := stream.Err(); err != nil {
		log.Fatal(err)
	}
}
import com.raft.wdm.Wdm;
import com.raft.wdm.raft.wdm.v1.Principal;
import com.raft.wdm.raft.wdm.v1.PrincipalType;
import com.raft.wdm.raft.wdm.v1.service.ActionEventType;
import com.raft.wdm.raft.wdm.v1.service.ActionStreamMode;
import com.raft.wdm.raft.wdm.v1.service.StreamActionsRequest;
import com.raft.wdm.raft.wdm.v1.service.StreamActionsResponse;
import com.raft.wdm.v1.WdmV1Client;

public final class StreamActionsExample {
  public static void main(String[] args) {
    var options = Wdm.toOptions(Wdm.loadConfig(null, null));

    try (var client = WdmV1Client.create(options)) {
      var req = StreamActionsRequest.newBuilder()
          .addAssignedTo(Principal.newBuilder()
              .setId("uav-shadow-07")
              .setType(PrincipalType.PRINCIPAL_TYPE_OBJECT))
          .setStreamMode(ActionStreamMode.ACTION_STREAM_MODE_LIVE_ONLY)
          .setHeartbeatPeriodMillis(30000)
          .build();

      try (var stream = client.getActionServiceBlocking().streamActions(req)) {
        for (StreamActionsResponse event : stream) {
          if (event.getEventType() == ActionEventType.ACTION_EVENT_TYPE_HEARTBEAT) {
            System.out.println("heartbeat");
            continue;
          }

          var action = event.getAction();
          System.out.printf("event: %s action: %s %s%n",
              event.getEventType(), action.getId(), action.getName());
        }
      }
    }
  }
}
import asyncio

from raft.wdm.v1 import common_pb2
from raft.wdm.v1.service import action_service_pb2

import raft_wdm_sdk


async def main() -> None:
    async with raft_wdm_sdk.Client.from_config(raft_wdm_sdk.load_config()) as client:
        stream = await client.action_service.stream_actions(
            action_service_pb2.StreamActionsRequest(
                assigned_to=[
                    common_pb2.Principal(
                        id="uav-shadow-07",
                        type=common_pb2.PRINCIPAL_TYPE_OBJECT,
                    ),
                ],
                stream_mode=action_service_pb2.ACTION_STREAM_MODE_LIVE_ONLY,
                heartbeat_period_millis=30000,
            ),
        )

        async for event in stream:
            if event.event_type == action_service_pb2.ACTION_EVENT_TYPE_HEARTBEAT:
                print("heartbeat")
                continue

            print(f"event: {event.event_type} action: {event.action.id} {event.action.name}")


asyncio.run(main())
import { create } from "@bufbuild/protobuf";
import { createClient, fromNodeConfig, loadConfig } from "@raft-tech/raft-wdm-sdk-typescript";
import { PrincipalType } from "@raft-tech/raft-wdm-sdk-typescript/gen/raft/wdm/v1/common_pb.js";
import {
  ActionEventType,
  ActionStreamMode,
  StreamActionsRequestSchema,
} from "@raft-tech/raft-wdm-sdk-typescript/gen/raft/wdm/v1/service/action_service_pb.js";

const client = createClient(...fromNodeConfig(loadConfig()));

const stream = client.actionService.streamActions(
  create(StreamActionsRequestSchema, {
    assignedTo: [
      {
        id: "uav-shadow-07",
        type: PrincipalType.OBJECT,
      },
    ],
    streamMode: ActionStreamMode.LIVE_ONLY,
    heartbeatPeriodMillis: 30000,
  }),
);

for await (const event of stream) {
  if (event.eventType === ActionEventType.HEARTBEAT) {
    console.log("heartbeat");
    continue;
  }

  console.log(
    `event: ${ActionEventType[event.eventType]} action: ${event.action?.id} ${event.action?.name}`,
  );
}

Request Shape

Principal filters combine with OR logic. An Action appears if it matches any requested, assigned, or authorized principal filter. Set streamMode to choose whether the stream includes initial state, live updates, or both.

{
  "assignedTo": [
    {
      "id": "uav-shadow-07",
      "type": "PRINCIPAL_TYPE_OBJECT"
    }
  ],
  "streamMode": "ACTION_STREAM_MODE_LIVE_ONLY",
  "heartbeatPeriodMillis": 30000
}

The full contract is defined in the WDM protobuf spec. In the WDM distribution, see raft/wdm/v1/service/action_service.proto for StreamActionsRequest, StreamActionsResponse, stream modes, and event types.

Stream Behavior

  • ACTION_STREAM_MODE_UNSPECIFIED sends initial non-terminal matching Actions and then continues with live updates.

  • ACTION_STREAM_MODE_INITIAL_ONLY sends the initial matching queue and closes the stream.

  • ACTION_STREAM_MODE_LIVE_ONLY skips initial state and emits only changes that happen after subscription.

Each message the client receives is a StreamActionsResponse. That response carries an eventType and, except for heartbeat messages, the Action associated with the event. The examples above read eventType inside the stream loop to decide whether to handle an Action update or ignore a heartbeat.

{
  "eventType": "ACTION_EVENT_TYPE_UPDATED",
  "timestamp": "2026-05-27T21:31:04Z",
  "action": {
    "id": "isr-collect-alpha",
    "name": "ISR Collection Alpha",
    "type": "isr_collection",
    "scope": "ACTION_SCOPE_TASK",
    "state": "ACTION_STATE_IN_PROGRESS",
    "priority": "ACTION_PRIORITY_HIGH",
    "progress": {
      "phase": "ON_STATION",
      "statusMessage": "Asset on station, commencing collection orbit"
    },
    "assignedTo": {
      "id": "uav-shadow-07",
      "type": "PRINCIPAL_TYPE_OBJECT"
    },
    "provenance": {
      "name": "uav-shadow-07",
      "updatedAt": "2026-05-27T21:31:04Z"
    }
  }
}
  • ACTION_EVENT_TYPE_INITIAL means the Action existed when the stream was established.

  • ACTION_EVENT_TYPE_CREATED means an Action was newly created or assigned.

  • ACTION_EVENT_TYPE_UPDATED means state or progress changed.

  • ACTION_EVENT_TYPE_CANCELLED means the Action was cancelled.

  • ACTION_EVENT_TYPE_HEARTBEAT is a keepalive message when heartbeat is configured.

Principals that receive Actions on this stream should use Update State to report acknowledgement, planning, execution progress, completion, failure, rejection, or cancellation.