Skip to content

Commit

Permalink
Change query entrypoints to use Into<Query> instead of just Query (
Browse files Browse the repository at this point in the history
…#213)

* Change query entrypoints to use `Into<Query>` instead of just `Query`

* Change examples/tests to remove explicit calls to `query`
  • Loading branch information
knutwalker authored Jan 15, 2025
1 parent 345e2cc commit f18c3e8
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 43 deletions.
10 changes: 5 additions & 5 deletions lib/examples/concurrent_writes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::stream::{self, StreamExt, TryStreamExt};
use neo4rs::{query, ConfigBuilder, Graph};
use neo4rs::{ConfigBuilder, Graph};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -35,7 +35,7 @@ async fn main() {

async fn work(i: u64, graph: Graph) -> (u64, u64, u64) {
graph
.run(query(
.run(
"
CREATE
(dan:Person {name: 'Dan'}),
Expand Down Expand Up @@ -77,12 +77,12 @@ CREATE
(elsa)-[:BUYS {amount: 3}]->(chocolate),
(elsa)-[:BUYS {amount: 3}]->(milk)
",
))
)
.await
.unwrap();

let node_count = graph
.execute(query("MATCH (n) RETURN count(n) AS count"))
.execute("MATCH (n) RETURN count(n) AS count")
.await
.unwrap()
.column_into_stream::<u64>("count")
Expand All @@ -91,7 +91,7 @@ CREATE
.unwrap();

let rel_count = graph
.execute(query("MATCH ()-[r]->() RETURN count(r) AS count"))
.execute("MATCH ()-[r]->() RETURN count(r) AS count")
.await
.unwrap()
.column_into_stream::<u64>("count")
Expand Down
34 changes: 20 additions & 14 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ impl Graph {
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run(&self, q: Query) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q, Operation::Write)
pub async fn run(&self, q: impl Into<Query>) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q.into(), Operation::Write)
.await
}

Expand All @@ -178,15 +178,16 @@ impl Graph {
pub async fn run_on(
&self,
db: impl Into<Database>,
q: Query,
q: impl Into<Query>,
operation: Operation,
) -> Result<()> {
self.impl_run_on(Some(db.into()), q, operation).await
self.impl_run_on(Some(db.into()), q.into(), operation).await
}

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub async fn run_on(&self, db: impl Into<Database>, q: Query) -> Result<()> {
self.impl_run_on(Some(db.into()), q, Operation::Write).await
pub async fn run_on(&self, db: impl Into<Database>, q: impl Into<Query>) -> Result<()> {
self.impl_run_on(Some(db.into()), q.into(), Operation::Write)
.await
}

#[allow(unused_variables)]
Expand Down Expand Up @@ -229,8 +230,8 @@ impl Graph {
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute(&self, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q, Operation::Write)
pub async fn execute(&self, q: impl Into<Query>) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q.into(), Operation::Write)
.await
}

Expand All @@ -240,8 +241,8 @@ impl Graph {
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute_read(&self, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q, Operation::Read)
pub async fn execute_read(&self, q: impl Into<Query>) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q.into(), Operation::Read)
.await
}

Expand All @@ -255,10 +256,11 @@ impl Graph {
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: Query,
q: impl Into<Query>,
operation: Operation,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, operation).await
self.impl_execute_on(Some(db.into()), q.into(), operation)
.await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
Expand All @@ -267,8 +269,12 @@ impl Graph {
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub async fn execute_on(&self, db: impl Into<Database>, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, Operation::Write)
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: impl Into<Query>,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q.into(), Operation::Write)
.await
}

Expand Down
8 changes: 4 additions & 4 deletions lib/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl Txn {
}

/// Runs a single query and discards the stream.
pub async fn run(&mut self, q: Query) -> Result<()> {
let mut query = q.clone();
pub async fn run(&mut self, q: impl Into<Query>) -> Result<()> {
let mut query = q.into();
if let Some(db) = self.db.as_ref() {
query = query.extra("db", db.to_string());
}
Expand All @@ -68,8 +68,8 @@ impl Txn {
}

/// Executes a query and returns a [`RowStream`]
pub async fn execute(&mut self, q: Query) -> Result<RowStream> {
let mut query = q.clone();
pub async fn execute(&mut self, q: impl Into<Query>) -> Result<RowStream> {
let mut query = q.into();
if let Some(db) = self.db.as_ref() {
query = query.extra("db", db.to_string());
}
Expand Down
21 changes: 6 additions & 15 deletions lib/tests/node_property_parsing.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, FixedOffset};
use neo4rs::{query, Node, Point2D, Point3D};
use neo4rs::{Node, Point2D, Point3D};

mod container;

Expand All @@ -9,31 +9,25 @@ async fn node_property_parsing() {
let graph = neo4j.graph();

graph
.run(query(
.run(
"CREATE
(:Datetime {p1:DATETIME('2024-12-31T08:10:35')}),
(:Point2D {a:Point ({x:2,y:3})}),
(:Point3D {a:Point ({x:3,y:4,z:5})})
",
))
)
.await
.unwrap();

let mut result = graph
.execute(query("MATCH (p:DateTime) RETURN p"))
.await
.unwrap();
let mut result = graph.execute("MATCH (p:DateTime) RETURN p").await.unwrap();

while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
let p1 = node.get::<DateTime<FixedOffset>>("p1").unwrap();
assert_eq!(p1.timestamp(), 1735632635);
}

let mut result = graph
.execute(query("MATCH (p:Point2D) RETURN p"))
.await
.unwrap();
let mut result = graph.execute("MATCH (p:Point2D) RETURN p").await.unwrap();

while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
Expand All @@ -42,10 +36,7 @@ async fn node_property_parsing() {
assert_eq!(p1.y(), 3.0);
}

let mut result = graph
.execute(query("MATCH (p:Point3D) RETURN p"))
.await
.unwrap();
let mut result = graph.execute("MATCH (p:Point3D) RETURN p").await.unwrap();

while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
Expand Down
8 changes: 3 additions & 5 deletions lib/tests/use_default_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::TryStreamExt;
use neo4rs::*;
use neo4rs::{query, Operation};

mod container;

Expand Down Expand Up @@ -27,13 +27,11 @@ async fn use_default_db() {

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
let query_stream = graph
.execute_on("system", query("SHOW DEFAULT DATABASE"), Operation::Read)
.execute_on("system", "SHOW DEFAULT DATABASE", Operation::Read)
.await;

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
let query_stream = graph
.execute_on("system", query("SHOW DEFAULT DATABASE"))
.await;
let query_stream = graph.execute_on("system", "SHOW DEFAULT DATABASE").await;

let default_db = query_stream
.unwrap()
Expand Down

0 comments on commit f18c3e8

Please sign in to comment.