Skip to content

Commit

Permalink
[Bifrost] Move Record to restate-types
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Sep 5, 2024
1 parent 36f1d6a commit a30d2c2
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 165 deletions.
3 changes: 1 addition & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ use tracing::{debug, info, instrument};
use restate_types::config::Configuration;
use restate_types::live::Live;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, Lsn};
use restate_types::logs::{LogId, Lsn, Record};
use restate_types::retries::RetryIter;

use crate::bifrost::BifrostInner;
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::record::Record;
use crate::{Error, InputRecord, Result};

#[derive(Clone, derive_more::Debug)]
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;

use futures::FutureExt;
use pin_project::pin_project;
use restate_types::logs::Record;
use tokio::sync::{mpsc, oneshot, Notify};
use tracing::{trace, warn};

Expand All @@ -20,7 +21,6 @@ use restate_types::identifiers::PartitionId;
use restate_types::storage::StorageEncode;

use crate::error::EnqueueError;
use crate::record::Record;
use crate::{Appender, InputRecord, Result};

/// Performs appends in the background concurrently while maintaining the order of records
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use bifrost::Bifrost;
pub use bifrost_admin::BifrostAdmin;
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
pub use record::{Header, InputRecord, LogEntry, Record};
pub use record::{InputRecord, LogEntry};
pub use service::BifrostService;
pub use types::*;

Expand Down
3 changes: 1 addition & 2 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use std::task::{ready, Poll};
use async_trait::async_trait;
use futures::{FutureExt, Stream};

use restate_types::logs::{KeyFilter, LogletOffset};
use restate_types::logs::{KeyFilter, LogletOffset, Record};

use crate::record::Record;
use crate::LogEntry;
use crate::{Result, TailState};

Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use tracing::instrument;

use restate_core::ShutdownError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::Record;
use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber};

use crate::loglet::{AppendError, Loglet, OperationError, SendableLogletReadStream};
use crate::record::Record;
use crate::{Commit, LogEntry, LsnExt};
use crate::{Result, TailState};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
use restate_rocksdb::{IoMode, Priority, RocksDb};
use restate_types::config::LocalLogletOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::logs::{LogletOffset, Record, SequenceNumber};

use super::keys::{MetadataKey, MetadataKind, RecordKey};
use super::log_state::LogStateUpdates;
Expand All @@ -34,7 +34,6 @@ use super::metric_definitions::{
};
use super::record_format::{encode_record_and_split, FORMAT_FOR_NEW_APPENDS};
use crate::loglet::OperationError;
use crate::record::Record;

type Ack = oneshot::Sender<Result<(), OperationError>>;
type AckRecv = oneshot::Receiver<Result<(), OperationError>>;
Expand Down
3 changes: 1 addition & 2 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::Mutex;
use tracing::{debug, warn};

use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber};
use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber};

use self::log_store::LogStoreError;
use self::log_store::RocksDbLogStore;
Expand All @@ -41,7 +41,6 @@ use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStre
use crate::providers::local_loglet::metric_definitions::{
BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH,
};
use crate::record::Record;
use crate::{Result, TailState};

#[derive(derive_more::Debug)]
Expand Down
14 changes: 4 additions & 10 deletions crates/bifrost/src/providers/local_loglet/record_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ use std::ops::Deref;
use bytes::{Buf, BufMut, Bytes, BytesMut};

use restate_types::flexbuffers_storage_encode_decode;
use restate_types::logs::{KeyFilter, Keys, MatchKeyQuery};
use restate_types::logs::{KeyFilter, Keys, MatchKeyQuery, Record};
use restate_types::storage::{PolyBytes, StorageCodec, StorageCodecKind, StorageDecodeError};
use restate_types::time::NanosSinceEpoch;

use crate::{Header, Record};

// use legacy for new appends until enough minor/major versions are released after current (1.0.x)
// to allow for backwards compatibility.
pub(super) const FORMAT_FOR_NEW_APPENDS: RecordFormat = RecordFormat::Legacy;
Expand Down Expand Up @@ -106,9 +104,7 @@ pub(super) fn decode_and_filter_record(
let internal_payload: LegacyPayload = StorageCodec::decode(&mut buffer)?;
let record = if internal_payload.keys.matches_key_query(filter) {
Some(Record::from_parts(
Header {
created_at: internal_payload.header.created_at,
},
internal_payload.header.created_at,
internal_payload.keys,
PolyBytes::Bytes(internal_payload.body),
))
Expand Down Expand Up @@ -210,7 +206,7 @@ fn decode_custom_encoded_record(
let created_at = NanosSinceEpoch::from(read_created_at(&mut buffer));
let body = PolyBytes::Bytes(Bytes::copy_from_slice(buffer.chunk()));

Ok(Some(Record::from_parts(Header { created_at }, keys, body)))
Ok(Some(Record::from_parts(created_at, keys, body)))
}

// Reads KeyStyle and extract the keys from the buffer
Expand Down Expand Up @@ -284,9 +280,7 @@ mod tests {
fn test_codec_compatibility() -> googletest::Result<()> {
// ensure that we can encode and decode both the old and new formats
let record = Record::from_parts(
crate::Header {
created_at: NanosSinceEpoch::from(100),
},
NanosSinceEpoch::from(100),
Keys::Single(14),
PolyBytes::Typed(Arc::new("hello".to_owned())),
);
Expand Down
157 changes: 13 additions & 144 deletions crates/bifrost/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@ use core::str;
use std::marker::PhantomData;
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn};
use restate_types::logs::{KeyFilter, LogletOffset, MatchKeyQuery};
use restate_types::storage::{
PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode,
};
use restate_types::logs::LogletOffset;
use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn, Record};
use restate_types::storage::{PolyBytes, StorageDecode, StorageDecodeError, StorageEncode};
use restate_types::time::NanosSinceEpoch;

use crate::LsnExt;
Expand Down Expand Up @@ -118,129 +114,6 @@ impl<S: Copy> LogEntry<S> {
}
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Header {
pub created_at: NanosSinceEpoch,
}

impl Default for Header {
fn default() -> Self {
Self {
created_at: NanosSinceEpoch::now(),
}
}
}

#[derive(Debug, Clone)]
pub struct Record {
header: Header,
body: PolyBytes,
keys: Keys,
}

impl Record {
pub fn from_parts(header: Header, keys: Keys, body: PolyBytes) -> Self {
Self { header, keys, body }
}

pub fn created_at(&self) -> NanosSinceEpoch {
self.header.created_at
}

pub fn keys(&self) -> &Keys {
&self.keys
}

pub fn body(&self) -> &PolyBytes {
&self.body
}

pub fn dissolve(self) -> (Header, PolyBytes, Keys) {
(self.header, self.body, self.keys)
}

/// Decode the record body into an owned value T.
///
/// Internally, this will clone the inner value if it's already in record cache, or will move
/// the value from the underlying Arc delivered from the loglet. Use this approach if you need
/// to mutate the value in-place and the cost of cloning sections is high. It's generally
/// recommended to use `decode_arc` whenever possible for large payloads.
pub fn decode<T: StorageDecode + StorageEncode + Clone>(self) -> Result<T, StorageDecodeError> {
let decoded = match self.body {
PolyBytes::Bytes(slice) => {
let mut buf = std::io::Cursor::new(slice);
StorageCodec::decode(&mut buf)?
}
PolyBytes::Typed(value) => {
let target_arc: Arc<T> = value.downcast_arc().map_err(|_| {
StorageDecodeError::DecodeValue(
anyhow::anyhow!(
"Type mismatch. Original value in PolyBytes::Typed does not match requested type"
)
.into(),
)})?;
// Attempts to move the inner value (T) if this Arc has exactly one strong
// reference. Otherwise, it clones the inner value.
match Arc::try_unwrap(target_arc) {
Ok(value) => value,
Err(value) => value.as_ref().clone(),
}
}
};
Ok(decoded)
}

/// Decode the record body into an Arc<T>. This is the most efficient way to access the entry
/// if you need read-only access or if it's acceptable to selectively clone inner sections. If
/// the record is in record cache, this will avoid cloning or deserialization of the value.
pub fn decode_arc<T: StorageDecode + StorageEncode>(
self,
) -> Result<Arc<T>, StorageDecodeError> {
let decoded = match self.body {
PolyBytes::Bytes(slice) => {
let mut buf = std::io::Cursor::new(slice);
Arc::new(StorageCodec::decode(&mut buf)?)
}
PolyBytes::Typed(value) => {
value.downcast_arc().map_err(|_| {
StorageDecodeError::DecodeValue(
anyhow::anyhow!(
"Type mismatch. Original value in PolyBytes::Typed does not match requested type"
)
.into(),
)})?
},
};
Ok(decoded)
}
}

impl MatchKeyQuery for Record {
fn matches_key_query(&self, query: &KeyFilter) -> bool {
self.keys.matches_key_query(query)
}
}

impl From<String> for Record {
fn from(value: String) -> Self {
Record {
header: Header::default(),
keys: Keys::None,
body: PolyBytes::Typed(Arc::new(value)),
}
}
}

impl From<&str> for Record {
fn from(value: &str) -> Self {
Record {
header: Header::default(),
keys: Keys::None,
body: PolyBytes::Typed(Arc::new(value.to_owned())),
}
}
}

#[derive(Debug, derive_more::IsVariant)]
enum MaybeRecord<S = Lsn> {
TrimGap(TrimGap<S>),
Expand All @@ -254,7 +127,7 @@ struct TrimGap<S> {
}

pub struct InputRecord<T> {
header: Header,
created_at: NanosSinceEpoch,
keys: Keys,
body: Arc<dyn StorageEncode>,
_phantom: PhantomData<T>,
Expand All @@ -263,7 +136,7 @@ pub struct InputRecord<T> {
impl<T> Clone for InputRecord<T> {
fn clone(&self) -> Self {
Self {
header: self.header.clone(),
created_at: self.created_at,
keys: self.keys.clone(),
body: Arc::clone(&self.body),
_phantom: self._phantom,
Expand All @@ -275,33 +148,29 @@ impl<T> Clone for InputRecord<T> {
// layout is identical.
impl<T: StorageEncode> InputRecord<T> {
pub(crate) fn into_record(self) -> Record {
Record {
header: self.header,
keys: self.keys,
body: PolyBytes::Typed(self.body),
}
Record::from_parts(self.created_at, self.keys, PolyBytes::Typed(self.body))
}
}

impl<T: StorageEncode> InputRecord<T> {
pub fn from_parts(header: Header, keys: Keys, body: Arc<T>) -> Self {
pub fn from_parts(created_at: NanosSinceEpoch, keys: Keys, body: Arc<T>) -> Self {
Self {
header,
created_at,
keys,
body,
_phantom: PhantomData,
}
}

pub fn created_at(&self) -> NanosSinceEpoch {
self.header.created_at
self.created_at
}
}

impl<T: StorageEncode + HasRecordKeys> From<Arc<T>> for InputRecord<T> {
fn from(val: Arc<T>) -> Self {
InputRecord {
header: Header::default(),
created_at: NanosSinceEpoch::now(),
keys: val.record_keys(),
body: val,
_phantom: PhantomData,
Expand All @@ -312,7 +181,7 @@ impl<T: StorageEncode + HasRecordKeys> From<Arc<T>> for InputRecord<T> {
impl From<String> for InputRecord<String> {
fn from(val: String) -> Self {
InputRecord {
header: Header::default(),
created_at: NanosSinceEpoch::now(),
keys: Keys::None,
body: Arc::new(val),
_phantom: PhantomData,
Expand All @@ -323,7 +192,7 @@ impl From<String> for InputRecord<String> {
impl From<&str> for InputRecord<String> {
fn from(val: &str) -> Self {
InputRecord {
header: Header::default(),
created_at: NanosSinceEpoch::now(),
keys: Keys::None,
body: Arc::new(String::from(val)),
_phantom: PhantomData,
Expand All @@ -334,7 +203,7 @@ impl From<&str> for InputRecord<String> {
impl<T: StorageEncode> From<BodyWithKeys<T>> for InputRecord<T> {
fn from(val: BodyWithKeys<T>) -> Self {
InputRecord {
header: Header::default(),
created_at: NanosSinceEpoch::now(),
keys: val.record_keys(),
body: Arc::new(val.into_inner()),
_phantom: PhantomData,
Expand Down
2 changes: 2 additions & 0 deletions crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use crate::storage::StorageEncode;

pub mod builder;
pub mod metadata;
mod record;
pub use record::Record;

#[derive(
Debug,
Expand Down
Loading

0 comments on commit a30d2c2

Please sign in to comment.