patternrustMinor
Asynchronous database query system using futures-rs in Rust
Viewed 0 times
asynchronousfuturesquerysystemdatabaseusingrust
Problem
Background
I've been working on an algorithmic trading platform in Rust and utilizing the newly released futures-rs library to keep everything asynchronous and non-blocking.
One issue I've come across with this is that the majority of database interface libraries currently available for Rust are blocking, which poses a large issue for my application since it's very heavily focused on speed and efficiency.
Description
To overcome this, I devised a system that my application can use to fire off database queries asynchronously. It works by starting up multiple threads on which Postgres connections are created. The system distributes new queries to available connections, handles to which are stored in a
If all the connections are busy, the query is stored in a different queue and popped out by the worker threads as they complete their previous queries. Both the connection handle queue and the query queue are held in
I've tested the system to ensure that it's indeed asynchronous, but I wanted to ask around and see if there was anything I could do to make it better either syntactically or performance-wise.
```
use std::collections::VecDeque;
use std::thread;
use std::sync::{Arc, Mutex};
use postgres;
use futures::stream::{Stream, channel, Sender, Receiver};
use futures::{Future, oneshot, Complete};
use transport::postgres::get_client;
// helper types to keep function declarations clean
type QueryError = postgres::error::Error;
type SenderQueue = Arc), ()>>>>;
type QueryQueue = Arc>>;
pub struct QueryServer {
conn_count: usize, // how many connections to open
query_queue: QueryQueue, // internal query queue
conn_queue: SenderQueue, // senders for idle query threads
}
// locks the QueryQueue and returns a queued query, if there are any.
fn try_get_new_query(query_queue: QueryQueue) -> Option {
let mut qq_inner = query_queue.lock().unwrap();
I've been working on an algorithmic trading platform in Rust and utilizing the newly released futures-rs library to keep everything asynchronous and non-blocking.
One issue I've come across with this is that the majority of database interface libraries currently available for Rust are blocking, which poses a large issue for my application since it's very heavily focused on speed and efficiency.
Description
To overcome this, I devised a system that my application can use to fire off database queries asynchronously. It works by starting up multiple threads on which Postgres connections are created. The system distributes new queries to available connections, handles to which are stored in a
VecDeque as Receiver objects.If all the connections are busy, the query is stored in a different queue and popped out by the worker threads as they complete their previous queries. Both the connection handle queue and the query queue are held in
Arc> objects to allow them to be accessed by different threads.I've tested the system to ensure that it's indeed asynchronous, but I wanted to ask around and see if there was anything I could do to make it better either syntactically or performance-wise.
```
use std::collections::VecDeque;
use std::thread;
use std::sync::{Arc, Mutex};
use postgres;
use futures::stream::{Stream, channel, Sender, Receiver};
use futures::{Future, oneshot, Complete};
use transport::postgres::get_client;
// helper types to keep function declarations clean
type QueryError = postgres::error::Error;
type SenderQueue = Arc), ()>>>>;
type QueryQueue = Arc>>;
pub struct QueryServer {
conn_count: usize, // how many connections to open
query_queue: QueryQueue, // internal query queue
conn_queue: SenderQueue, // senders for idle query threads
}
// locks the QueryQueue and returns a queued query, if there are any.
fn try_get_new_query(query_queue: QueryQueue) -> Option {
let mut qq_inner = query_queue.lock().unwrap();
Solution
Here are a few shallow comments on the code.
This code
looks like a verbose way of writing
The signature
means that it takes an
instead. The inner type is better labelled
This code
firstly has a very strange comment, since
Secondly, it'd be nicer to write as
I'd also write it as
since it also doesn't really make calling any harder and increases flexibility.
This is a totally optional style point, but IMO,
is nicer as
In
Personally, given that
This code
let mut qq_inner = query_queue.lock().unwrap();
// there is a queued query
if !qq_inner.is_empty() {
return Some(qq_inner.pop_front().unwrap())
}else{
// No new queries
return None
}looks like a verbose way of writing
query_queue.lock().unwrap().pop_front()The signature
fn try_get_new_query(query_queue: QueryQueue) -> Optionmeans that it takes an
Arc, but the code has no interest in ownership so should really befn try_get_new_query(query_queue: &Mutex>) -> Optioninstead. The inner type is better labelled
QueryQueue and the outer types are Arc and &QueryQueue. This gives your API more flexibility, and since Arc dereferences to a normal &-reference, this does not prevent any prior use-cases.This code
#[allow(unused_must_use)]
fn execute_query(query: String, client: &postgres::Connection) {
client.execute(query.as_str(), &[])
/*.map_err(|err| println!("Error saving tick: {:?}", err) )*/;
}firstly has a very strange comment, since
map_err should almost never involve println!. I could understand logging, but not if it throws away the error.Secondly, it'd be nicer to write as
fn execute_query(query: String, client: &postgres::Connection) {
let _ = client.execute(query.as_str(), &[]);
}I'd also write it as
fn execute_query(query: &str, client: &postgres::Connection) {
let _ = client.execute(query, &[]);
}since it also doesn't really make calling any harder and increases flexibility.
This is a totally optional style point, but IMO,
thread::spawn(move || { init_query_processor(rx, qq_copy) });is nicer as
thread::spawn(move || init_query_processor(rx, qq_copy));In
execute, there's another minor style point which is that }else{ should be } else {.Personally, given that
let (c, o) = oneshot::(); only uses c and o once, there's no great need to shorten them so much. YMMV.Code Snippets
let mut qq_inner = query_queue.lock().unwrap();
// there is a queued query
if !qq_inner.is_empty() {
return Some(qq_inner.pop_front().unwrap())
}else{
// No new queries
return None
}query_queue.lock().unwrap().pop_front()fn try_get_new_query(query_queue: QueryQueue) -> Option<String>fn try_get_new_query(query_queue: &Mutex<VecDeque<String>>) -> Option<String>#[allow(unused_must_use)]
fn execute_query(query: String, client: &postgres::Connection) {
client.execute(query.as_str(), &[])
/*.map_err(|err| println!("Error saving tick: {:?}", err) )*/;
}Context
StackExchange Code Review Q#139815, answer score: 3
Revisions (0)
No revisions yet.