Logger.configure(level: :info)
# For ffmpeg and ffplay commands to work on Mac Livebook Desktop
System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}")
# In case of problems installing Nx/EXLA/Bumblebee,
# you can remove them and the Nx backend config below.
# Examples that don't mention them should still work.
Mix.install([:boombox, :kino, :nx, :exla, :bumblebee, :websockex, :membrane_simple_rtsp_server])
Nx.global_default_backend(EXLA.Backend)
👋 Here are some examples of using Boombox. Some of them use ffmpeg to generate stream. Some use ffplay to playback generated videos, but you can use any other player, for example VLC.
The cell below downloads assets to be used in the examples, and runs an HTTP server on port 1234
that serves static HTML files for sending/receiving the stream in the browser.
data_dir = "/tmp/boombox_examples_data"
input_dir = "#{data_dir}/input"
File.mkdir_p!(input_dir)
out_dir = "#{data_dir}/output"
File.mkdir_p!(out_dir)
samples_url =
"https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples"
unless File.exists?("#{input_dir}/bun.mp4") do
%{status: 200, body: bbb_mp4} = Req.get!("#{samples_url}/big-buck-bunny/bun33s.mp4")
File.write!("#{input_dir}/bun.mp4", bbb_mp4)
end
unless File.exists?("#{input_dir}/bun.mkv") do
%{status: 200, body: bbb_mkv} = Req.get!("#{samples_url}/big-buck-bunny/bun33s.mkv")
File.write!("#{input_dir}/bun.mkv", bbb_mkv)
end
assets_url =
"https://raw.githubusercontent.com/membraneframework/boombox/master/boombox_examples_data"
for asset <- ["hls", "webrtc_from_browser", "webrtc_to_browser", "talk_to_llm"],
path = "#{data_dir}/#{asset}.html",
not File.exists?(path) do
%{status: 200, body: data} = Req.get!("#{assets_url}/#{asset}.html")
File.write!(path, data)
end
# HTTP server for assets
:inets.stop()
:ok = :inets.start()
{:ok, _server} =
:inets.start(:httpd,
bind_address: ~c"localhost",
port: 1234,
document_root: ~c"#{data_dir}",
server_name: ~c"assets_server",
server_root: ~c"/tmp",
erl_script_nocache: true
)
Visit http://localhost:1234/webrtc_from_browser.html to send the stream and http://localhost:1234/webrtc_to_browser.html to receive it
Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: {:webrtc, "ws://localhost:8830"})
To send the stream, visit http://localhost:1234/webrtc_from_browser.html.
Note: don't stop this cell to finish recording - click 'disconnect' or close the browser tab instead, so the recording is finalized properly.
Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: "#{out_dir}/webrtc_to_mp4.mp4")
System.shell("ffplay #{out_dir}/webrtc_to_mp4.mp4")
To receive the stream, visit http://localhost:1234/hls.html after running the cell below
Boombox.run(input: "#{input_dir}/bun.mp4", output: {:hls, "#{out_dir}/index.m3u8"})
To receive the stream, visit http://localhost:1234/hls.html after running the cell below
uri = "rtmp://localhost:5432"
t =
Task.async(fn ->
Boombox.run(input: uri, output: "#{out_dir}/index.m3u8")
end)
{_output, 0} = System.shell("ffmpeg -re -i #{input_dir}/bun.mp4 -c copy -f flv #{uri}")
Visit http://localhost:1234/webrtc_from_browser.html to send the stream and http://localhost:1234/hls.html to receive it
Boombox.run(input: {:webrtc, "ws://localhost:8829"}, output: {:hls, "#{out_dir}/index.m3u8"})
Kino.start_child!({
Membrane.RTMPServer,
handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()},
port: 5001,
use_ssl?: false,
handle_new_client: fn client_ref, app, stream_key ->
hls_dir = "#{out_dir}/#{stream_key}"
File.mkdir_p!(hls_dir)
Task.start(fn ->
Boombox.run(input: {:rtmp, client_ref}, output: "#{hls_dir}/index.m3u8")
end)
Kino.Markdown.new("""
New streamer connects with app #{app} and stream_key #{stream_key},
stream will be available at http://localhost:1234/hls.html?src=output/#{stream_key}/index.m3u8.
It may take a few seconds before the stream is playable.
""")
|> Kino.render()
end,
client_timeout: 1_000
})
button = Kino.Control.button("Connect streamer")
Kino.render(button)
button
|> Stream.filter(fn event -> event.type == :click end)
|> Kino.async_listen(fn _event ->
key = Base.encode16(:crypto.strong_rand_bytes(4))
uri = "rtmp://localhost:5001/streamer/#{key}"
{_output, 0} = System.shell("ffmpeg -re -i #{input_dir}/bun.mp4 -c copy -f flv #{uri}")
end)
:ok
This example lets you perform a natural conversation with Chat GPT using voice. Boombox is used to deliver audio between the browser and server. It uses WebRTC, which is probably the best option for this case.
The module below is a simple interface to the OpenAI realtime audio API. It accepts PCM audio (1 channel, 24kHz, s16le) and responds in the same format. Thanks to that, we don't need to do speech to text nor text to speech. This results in very low latency and simple logic.
If you prefer open source solutions, there's Ultravox, but while it accepts audio, it outputs text for now, so you'd need TTS. If there's anything else we should link here, please open a PR.
defmodule OpenAIWebSocket do
use WebSockex
require Logger
def start_link(opts) do
# OpenAI API docs: https://platform.openai.com/docs/guides/realtime
WebSockex.start_link(
"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01",
__MODULE__,
%{response: <<>>},
extra_headers: [
{"Authorization", "Bearer " <> opts[:token]},
{"OpenAI-Beta", "realtime=v1"}
]
)
end
def send_audio(ws, audio) do
audio = Base.encode64(audio)
frame = %{type: "input_audio_buffer.append", audio: audio} |> Jason.encode!()
WebSockex.send_frame(ws, {:text, frame})
end
def get_response_chunk(ws, chunk_byte_size) do
# There's no 'call' in WebSockex, so we just send and receive
send(ws, {:get_response_chunk, chunk_byte_size, self()})
receive do
{:response_chunk, chunk} -> chunk
end
end
@impl true
def handle_frame({:text, frame}, state) do
case Jason.decode!(frame) do
%{"type" => "response.audio.delta", "delta" => delta} ->
audio_payload = Base.decode64!(delta)
# Buffer the response audio
response = state.response <> audio_payload
{:ok, %{state | response: response}}
%{"type" => "input_audio_buffer.speech_started"} ->
# If the user speaks, they may interrupt the current response,
# so we drop it and wait for a new one.
{:ok, %{state | response: <<>>}}
%{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
Logger.info("AI transcription: #{transcript}")
{:ok, state}
%{} = _event ->
{:ok, state}
end
end
@impl true
def handle_frame(_frame, state), do: {:ok, state}
@impl true
def handle_info({:get_response_chunk, size, pid}, state) do
case state.response do
<<chunk::binary-size(size), rest::binary>> ->
# If we have enough data, send it back
send(pid, {:response_chunk, chunk})
{:ok, %{state | response: rest}}
chunk ->
# Otherwise, send what we have, padded with silence
silence = <<0::size(size - byte_size(chunk))-unit(8)>>
send(pid, {:response_chunk, chunk <> silence})
{:ok, %{state | response: <<>>}}
end
end
end
In the cell below, we receive stream from the browser via WebRTC, feed it to the API, receive response and send it back to the browser. You need to add the Open AI API token as a OPEN_AI_TOKEN
secret in Livebook for this to work. To connect via WebRTC, visit http://localhost:1234/talk_to_llm.html after running this cell
{:ok, ws} = OpenAIWebSocket.start_link(token: System.fetch_env!("LB_OPEN_AI_TOKEN"))
# Ingress part
Task.start_link(fn ->
Boombox.run(
# Connect to the browser via WebRTC, using WebSocket for session establishment
input: {:webrtc, "ws://localhost:8829"},
output: {
:stream,
# Audio format that the OpenAI API expects
video: false, audio: :binary, audio_format: :s16le, audio_channels: 1, audio_rate: 24_000
}
)
|> Enum.each(fn packet -> OpenAIWebSocket.send_audio(ws, packet.payload) end)
end)
# Egress part
# We send 20 millisecond chunks to Boombox
chunk_duration_ms = 20
# Samples per second * bytes per sample * chunk duration in seconds
chunk_byte_size = trunc(24_000 * 2 * chunk_duration_ms / 1_000)
Stream.interval(chunk_duration_ms)
# This emits the current time in milliseconds (0, 20, 40, 60...) every 20ms
|> Stream.map(&(&1 * chunk_duration_ms))
|> Stream.map(fn time ->
response_chunk = OpenAIWebSocket.get_response_chunk(ws, chunk_byte_size)
%Boombox.Packet{
payload: response_chunk,
kind: :audio,
pts: Membrane.Time.milliseconds(time),
# Audio format that the OpenAI API outputs
format: %{audio_format: :s16le, audio_channels: 1, audio_rate: 24_000}
}
end)
|> Boombox.run(
input: {:stream, audio: :binary, video: false},
# Connect to the browser via WebRTC, using WebSocket for session establishment
output: {:webrtc, "ws://localhost:8830"}
)
{:ok, whisper} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})
serving =
Bumblebee.Audio.speech_to_text_whisper(
whisper,
featurizer,
tokenizer,
generation_config,
defn_options: [compiler: EXLA]
)
Boombox.run(
input: "#{samples_url}/sherlock_librivox.mp4",
output:
{:stream,
video: false, audio: :binary, audio_rate: 16_000, audio_channels: 1, audio_format: :f32le}
)
|> Stream.map(&Nx.from_binary(&1.payload, :f32))
|> Stream.chunk_every(200)
|> Enum.each(fn chunk ->
batch = Nx.concatenate(chunk)
Nx.Serving.run(serving, batch).chunks
|> Enum.map_join(& &1.text)
|> IO.puts()
end)
To send the stream, visit http://localhost:1234/webrtc_from_browser.html
{:ok, whisper} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})
serving =
Bumblebee.Audio.speech_to_text_whisper(
whisper,
featurizer,
tokenizer,
generation_config,
defn_options: [compiler: EXLA]
)
Boombox.run(
input: {:webrtc, "ws://localhost:8829"},
output:
{:stream,
video: false, audio: :binary, audio_rate: 16_000, audio_channels: 1, audio_format: :f32le}
)
|> Stream.map(&Nx.from_binary(&1.payload, :f32))
|> Stream.chunk_every(200)
|> Enum.each(fn chunk ->
batch = Nx.concatenate(chunk)
Nx.Serving.run(serving, batch).chunks
|> Enum.map_join(& &1.text)
|> IO.puts()
end)
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html
overlay =
Req.get!("https://avatars.githubusercontent.com/u/25247695?s=200&v=4").body
|> Vix.Vips.Image.new_from_buffer()
|> then(fn {:ok, img} -> img end)
|> Image.trim!()
|> Image.thumbnail!(100)
bg = Image.new!(640, 480, color: :light_gray)
max_x = Image.width(bg) - Image.width(overlay)
max_y = Image.height(bg) - Image.height(overlay)
Stream.iterate({_x = 300, _y = 0, _dx = 1, _dy = 2, _pts = 0}, fn {x, y, dx, dy, pts} ->
dx = if (x + dx) in 0..max_x, do: dx, else: -dx
dy = if (y + dy) in 0..max_y, do: dy, else: -dy
pts = pts + div(Membrane.Time.seconds(1), _fps = 60)
{x + dx, y + dy, dx, dy, pts}
end)
|> Stream.map(fn {x, y, _dx, _dy, pts} ->
img = Image.compose!(bg, overlay, x: x, y: y)
%Boombox.Packet{kind: :video, payload: img, pts: pts}
end)
|> Boombox.run(
input: {:stream, video: :image, audio: false},
output: {:webrtc, "ws://localhost:8830"}
)
Inspired by Silicon Valley's Not hot dog app, and Evadne Wu's talk.
To send the stream, visit http://localhost:1234/webrtc_from_browser.html.
model_name = "google/vit-base-patch16-224"
{:ok, resnet} = Bumblebee.load_model({:hf, model_name})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, model_name})
serving = Bumblebee.Vision.image_classification(resnet, featurizer)
:ok
frame = Kino.Frame.new()
Kino.render(frame)
Boombox.run(
input: {:webrtc, "ws://localhost:8829"},
output: {:stream, video: :image, audio: false}
)
|> Stream.take_every(10)
|> Stream.map(fn frame ->
tensor =
frame.payload
|> Image.thumbnail!(224)
|> Image.embed!(224, 224)
|> Image.to_nx!()
hot_dog =
Nx.Serving.run(serving, tensor).predictions
|> Enum.find(fn p -> String.contains?(p.label, "hotdog") end)
if hot_dog do
"## ✅ Hotdog"
else
"## ❌ Not hotdog"
end
end)
|> Enum.each(fn text -> Kino.Frame.render(frame, Kino.Markdown.new(text)) end)
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html
Note: due to a bug in Chrome, it may not work there unless launched with --enable-features=WebRtcEncodedTransformDirectCallback
. See https://issues.chromium.org/issues/351275970.
uri = "rtmp://localhost:5434"
t =
Task.async(fn ->
Boombox.run(input: uri, output: {:webrtc, "ws://localhost:8830"})
end)
{_output, 0} = System.shell("ffmpeg -re -i #{input_dir}/bun.mp4 -c copy -f flv #{uri}")
Task.await(t)
uri = "rtmp://localhost:5432"
t =
Task.async(fn ->
Boombox.run(input: uri, output: "#{out_dir}/rtmp_to_mp4.mp4")
end)
{_output, 0} = System.shell("ffmpeg -re -i #{input_dir}/bun.mp4 -c copy -f flv #{uri}")
Task.await(t)
System.shell("ffplay #{out_dir}/rtmp_to_mp4.mp4")
signaling = Membrane.WebRTC.SignalingChannel.new()
t =
Task.async(fn ->
Boombox.run(input: "#{input_dir}/bun.mp4", output: {:webrtc, signaling})
end)
Boombox.run(input: {:webrtc, signaling}, output: "#{out_dir}/mp4_webrtc_mp4.mp4")
Task.await(t)
System.shell("ffplay #{out_dir}/mp4_webrtc_mp4.mp4")
To receive the stream, visit http://localhost:1234/hls.html after running the cell below
rtsp_port = 8554
Membrane.SimpleRTSPServer.start_link("#{input_dir}/bun.mp4", port: rtsp_port)
Boombox.run(input: "rtsp://localhost:#{rtsp_port}/", output: "#{out_dir}/index.m3u8")
To receive the stream, visit http://localhost:1234/hls.html after running the cell below
rtp_port = 50001
t =
Task.async(fn ->
Boombox.run(
input: {:rtp, port: rtp_port, track_configs: [audio: [encoding: :OPUS], video: [encoding: :H264]]},
output: "#{out_dir}/index.m3u8"
)
end)
{_output, 0} =
System.shell("""
ffmpeg -re -i #{input_dir}/bun.mkv \
-map 0:v:0 -c:v copy -payload_type 96 -f rtp rtp://127.0.0.1:#{rtp_port} \
-map 0:a:0 -c:a copy -payload_type 120 -f rtp rtp://127.0.0.1:#{rtp_port}
""")
Process.sleep(200)
Task.shutdown(t)
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html after running the cell below.
Note: due to a bug in Chrome, it may not work there unless launched with --enable-features=WebRtcEncodedTransformDirectCallback
. See https://issues.chromium.org/issues/351275970.
Boombox.run(input: "#{input_dir}/bun.mp4", output: {:webrtc, "ws://localhost:8830"})
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html after running the cell below.
Note: due to a bug in Chrome, it may not work there unless launched with --enable-features=WebRtcEncodedTransformDirectCallback
. See https://issues.chromium.org/issues/351275970.
Boombox.run(
input: "#{samples_url}/big-buck-bunny/bun33s.mp4",
output: {:webrtc, "ws://localhost:8830"}
)