AlphornAlphorn Docs

Real-time Streaming

Stream real-time notifications via Server-Sent Events (SSE) to build live dashboards and custom UIs.

Alphorn supports real-time notification streaming via Server-Sent Events (SSE). Build live dashboards, integrate with custom UIs, or connect to any system that supports SSE.

How It Works

The Live Stream (SSE) channel is built into Alphorn. When enabled, you can subscribe to a stream URL and receive notifications as they arrive — no polling required.

Connecting to a Stream

JavaScript (browser or Node.js)

const eventSource = new EventSource(
  "https://app.alphorn.dev/api/stream/wh_abc123"
);

eventSource.onmessage = (event) => {
  const notification = JSON.parse(event.data);
  console.log(notification.title, notification.message);
};

eventSource.onerror = (error) => {
  console.error("Stream error:", error);
};

cURL

curl -N https://app.alphorn.dev/api/stream/wh_abc123

The -N flag disables buffering so events appear immediately. Press Ctrl+C to stop.

Python

import requests
import json

response = requests.get(
    "https://app.alphorn.dev/api/stream/wh_abc123",
    stream=True,
    headers={"Accept": "text/event-stream"},
)

for line in response.iter_lines():
    if line:
        decoded = line.decode("utf-8")
        if decoded.startswith("data: "):
            notification = json.loads(decoded[6:])
            print(f"{notification['title']}: {notification['message']}")

Go

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"net/http"
	"strings"
)

func main() {
	resp, err := http.Get("https://app.alphorn.dev/api/stream/wh_abc123")
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	scanner := bufio.NewScanner(resp.Body)
	for scanner.Scan() {
		line := scanner.Text()
		if strings.HasPrefix(line, "data: ") {
			var notification map[string]interface{}
			json.Unmarshal([]byte(line[6:]), &notification)
			fmt.Printf("%s: %s\n", notification["title"], notification["message"])
		}
	}
}

Event Format

Each event is a JSON object:

{
  "id": "msg_abc123",
  "title": "Deploy Complete",
  "message": "v2.1.0 deployed to production",
  "priority": 3,
  "tags": ["deploy", "production"],
  "timestamp": "2025-01-15T10:30:00Z"
}

Reconnection

SSE connections may drop due to network issues, proxy timeouts, or server restarts. The browser's EventSource API reconnects automatically. For other clients, implement reconnection with backoff:

import time
import requests

def stream_with_reconnect(url, max_retries=10):
    retries = 0
    while retries < max_retries:
        try:
            response = requests.get(url, stream=True, headers={"Accept": "text/event-stream"})
            retries = 0  # Reset on successful connection
            for line in response.iter_lines():
                if line:
                    print(line.decode("utf-8"))
        except requests.exceptions.ConnectionError:
            retries += 1
            wait = min(30, 2 ** retries)
            print(f"Connection lost, reconnecting in {wait}s...")
            time.sleep(wait)

If you're running behind a reverse proxy, increase the read timeout to prevent the proxy from closing idle connections:

# Nginx
proxy_read_timeout 86400;
proxy_buffering off;

Use Cases

  • Live dashboards — Display notifications in real-time on a monitoring screen
  • Custom notification UIs — Build your own notification center
  • Automation — Trigger actions in response to notifications
  • Logging — Stream notifications to a custom logging pipeline

On this page