Skip to content

Commit

Permalink
add run id to recordings (#466)
Browse files Browse the repository at this point in the history
* Add run id to recordings

* get recordings by run id

* lower time drift threshold

* update start date on discontinuity

* add correct run dates function
  • Loading branch information
gBillal committed Jul 27, 2024
1 parent b1e3f54 commit 8a975d4
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 41 deletions.
3 changes: 2 additions & 1 deletion apps/ex_nvr/lib/ex_nvr/model/recording.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule ExNVR.Model.Recording do
field :stream, Ecto.Enum, values: [:high, :low], default: :high

belongs_to :device, ExNVR.Model.Device
belongs_to :run, ExNVR.Model.Run, type: :integer
end

def with_type(query \\ __MODULE__, stream_type) do
Expand Down Expand Up @@ -98,7 +99,7 @@ defmodule ExNVR.Model.Recording do

def changeset(params) do
%__MODULE__{}
|> Changeset.cast(params, @required_fields ++ [:stream])
|> Changeset.cast(params, @required_fields ++ [:stream, :run_id])
|> Changeset.validate_required(@required_fields)
end
end
4 changes: 4 additions & 0 deletions apps/ex_nvr/lib/ex_nvr/pipeline/output/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ defmodule ExNVR.Pipeline.Output.Storage do
%{device_id: state.device.id, stream: state.stream}
)

unless run.active do
Membrane.Logger.info("run discontinuity: #{run.id}")
end

{maybe_new_run(state, run), recording}

{:error, error} ->
Expand Down
51 changes: 35 additions & 16 deletions apps/ex_nvr/lib/ex_nvr/pipeline/output/storage/segmenter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule ExNVR.Pipeline.Output.Storage.Segmenter do
alias Membrane.{Buffer, Event, H264, H265, Time}

@time_error Time.milliseconds(30)
@time_drift_threshold Time.seconds(30)
@jitter_buffer_delay Time.milliseconds(200)

def_options target_duration: [
Expand Down Expand Up @@ -155,12 +156,16 @@ defmodule ExNVR.Pipeline.Output.Storage.Segmenter do

defp handle_buffer(state, %Buffer{} = buffer) do
if Utils.keyframe(buffer) and Segment.duration(state.segment) >= state.target_duration do
state = finalize_segment(state, state.correct_timestamp)
{state, discontinuity} = finalize_segment(state, state.correct_timestamp)

actions =
[end_of_stream: Pad.ref(:output, state.start_time)] ++ completed_segment_action(state)
[end_of_stream: Pad.ref(:output, state.start_time)] ++
completed_segment_action(state, discontinuity)

start_time = Segment.end_date(state.segment)
start_time =
if discontinuity,
do: state.segment.wallclock_end_date,
else: Segment.end_date(state.segment)

state =
%{
Expand All @@ -186,7 +191,7 @@ defmodule ExNVR.Pipeline.Output.Storage.Segmenter do
end

defp do_handle_end_of_stream(state) do
state = finalize_segment(state)
{state, _discontinuity} = finalize_segment(state, false)

{[end_of_stream: Pad.ref(:output, state.start_time)] ++ completed_segment_action(state, true),
Map.merge(state, init_state())}
Expand Down Expand Up @@ -219,31 +224,45 @@ defmodule ExNVR.Pipeline.Output.Storage.Segmenter do
}
end

defp finalize_segment(%{segment: segment} = state, correct_timestamp \\ false) do
end_date = Time.os_time()
monotonic_end_date = Time.monotonic_time()
defp finalize_segment(%{segment: segment} = state, correct_timestamp) do
end_date = Time.os_time() - @jitter_buffer_delay
monotonic_duration = Time.monotonic_time() - state.monotonic_start_time

{segment, discontinuity?} =
maybe_correct_timestamp(segment, correct_timestamp, state, end_date)

segment =
segment
|> maybe_correct_timestamp(correct_timestamp, state, end_date)
|> Segment.with_realtime_duration(monotonic_end_date - state.monotonic_start_time)
|> then(&Segment.with_wall_clock_duration(&1, end_date - &1.start_date))
|> Segment.with_realtime_duration(monotonic_duration)
|> Segment.with_wall_clock_duration(end_date - segment.start_date)
|> then(&%{&1 | wallclock_end_date: end_date})

%{state | segment: %{segment | wallclock_end_date: end_date}}
{%{state | segment: segment}, discontinuity?}
end

defp maybe_correct_timestamp(segment, false, %{first_segment?: false}, _end_date), do: segment
defp maybe_correct_timestamp(segment, false, %{first_segment?: false}, _end_date),
do: {segment, false}

defp maybe_correct_timestamp(segment, true, %{first_segment?: false}, end_date) do
# clap the time diff between -@time_error and @time_error
time_diff = end_date - Segment.end_date(segment)
diff = time_diff |> max(-@time_error) |> min(@time_error)
Segment.add_duration(segment, diff)

if abs(time_diff) >= @time_drift_threshold do
Membrane.Logger.warning("""
Diff between segment end date and current date is more than #{Time.as_seconds(@time_drift_threshold, :round)} seconds
diff: #{Time.as_microseconds(time_diff, :round)}
""")

{segment, true}
else
diff = time_diff |> max(-@time_error) |> min(@time_error)
{Segment.add_duration(segment, diff), false}
end
end

defp maybe_correct_timestamp(segment, _correct_timestamp, _state, end_date) do
start_date = end_date - Segment.duration(segment) - @jitter_buffer_delay
%{segment | start_date: start_date, end_date: end_date}
start_date = end_date - Segment.duration(segment)
{%{segment | start_date: start_date, end_date: end_date}, false}
end

defp completed_segment_action(state, discontinuity \\ false) do
Expand Down
47 changes: 42 additions & 5 deletions apps/ex_nvr/lib/ex_nvr/recordings.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ defmodule ExNVR.Recordings do
params = if is_struct(params), do: Map.from_struct(params), else: params

with :ok <- copy_file(device, params, copy_file?) do
recording_changeset =
Multi.new()
|> Multi.insert(:run, run, on_conflict: {:replace_all_except, [:start_date]})
|> Multi.insert(:recording, fn %{run: run} ->
params
|> Map.put(:filename, recording_path(device, params) |> Path.basename())
|> Map.put(:run_id, run.id)
|> Recording.changeset()

Multi.new()
|> Multi.insert(:recording, recording_changeset)
|> Multi.insert(:run, run, on_conflict: {:replace_all_except, [:start_date]})
end)
|> Repo.transaction()
|> case do
{:ok, %{recording: recording, run: run}} ->
Expand Down Expand Up @@ -114,6 +114,43 @@ defmodule ExNVR.Recordings do
)
end

@doc """
Correct run and recordings dates in case of clock jumps (NTP sync)
"""
@spec correct_run_dates(Device.t(), Run.t(), integer()) :: Run.t()
def correct_run_dates(device, run, duration) do
recs = Repo.all(from(r in Recording, where: r.run_id == ^run.id, order_by: r.start_date))

Enum.each(recs, fn rec ->
start_date = DateTime.add(rec.start_date, duration, :microsecond)
end_date = DateTime.add(rec.end_date, duration, :microsecond)

changeset =
Ecto.Changeset.change(
rec,
%{
start_date: start_date,
end_date: end_date,
filename: "#{DateTime.to_unix(start_date, :microsecond)}.mp4"
}
)

new_rec = ExNVR.Repo.update!(changeset)

File.rename!(
ExNVR.Recordings.recording_path(device, rec),
ExNVR.Recordings.recording_path(device, new_rec)
)
end)

run
|> Run.changeset(%{
start_date: DateTime.add(run.start_date, duration, :microsecond),
end_date: DateTime.add(run.end_date, duration, :microsecond)
})
|> Repo.update!()
end

@spec delete_oldest_recordings(Device.t(), integer()) ::
:ok | {:error, Ecto.Changeset.t()}
def delete_oldest_recordings(device, limit) do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule :"Elixir.ExNVR.Repo.Migrations.Add-run-id-to-recordings" do
use Ecto.Migration

def change do
alter table(:recordings) do
add :run_id, references(:runs)
end

create index(:recordings, [:run_id])
end
end
54 changes: 54 additions & 0 deletions apps/ex_nvr/test/ex_nvr/recordings/recordings_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,55 @@ defmodule ExNVR.RecordingTest do
end
end

test "correct run dates", %{device: device} do
start_date = ~U(2024-07-27 10:10:00.000000Z)
duration = 150_000_000

run =
run_fixture(device,
start_date: start_date,
end_date: DateTime.add(start_date, 5, :minute)
)

recs =
Enum.map(0..4, fn idx ->
recording_fixture(device,
run_id: run.id,
start_date: DateTime.add(start_date, idx, :minute),
end_date: DateTime.add(start_date, idx + 1, :minute)
)
end)

assert updated_run = Recordings.correct_run_dates(device, run, duration)

assert DateTime.compare(
updated_run.start_date,
DateTime.add(run.start_date, duration, :microsecond)
) == :eq

assert DateTime.compare(
updated_run.end_date,
DateTime.add(run.end_date, duration, :microsecond)
) == :eq

assert updated_recs = list_recordings(updated_run)

for {rec, updated_rec} <- Enum.zip(recs, updated_recs) do
assert DateTime.compare(
updated_rec.start_date,
DateTime.add(rec.start_date, duration, :microsecond)
) == :eq

assert DateTime.compare(
updated_rec.end_date,
DateTime.add(rec.end_date, duration, :microsecond)
) == :eq

refute File.exists?(Recordings.recording_path(device, rec))
assert File.exists?(Recordings.recording_path(device, updated_rec))
end
end

defp assert_files_deleted(device, stream_type, recordings, count) do
recordings_path =
recordings
Expand All @@ -156,4 +205,9 @@ defmodule ExNVR.RecordingTest do
run = Recordings.list_runs([device_id: device.id], stream_type) |> List.first()
assert DateTime.compare(run.start_date, date) == :eq
end

def list_recordings(run) do
from(r in Recording, where: r.run_id == ^run.id, order_by: r.start_date)
|> ExNVR.Repo.all()
end
end
Loading

0 comments on commit 8a975d4

Please sign in to comment.