HiveBrain v1.2.0
Get Started
← Back to all entries
patternrustMinor

Asynchronous database query system using futures-rs in Rust

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
asynchronousfuturesquerysystemdatabaseusingrust

Problem

Background

I've been working on an algorithmic trading platform in Rust and utilizing the newly released futures-rs library to keep everything asynchronous and non-blocking.

One issue I've come across with this is that the majority of database interface libraries currently available for Rust are blocking, which poses a large issue for my application since it's very heavily focused on speed and efficiency.

Description

To overcome this, I devised a system that my application can use to fire off database queries asynchronously. It works by starting up multiple threads on which Postgres connections are created. The system distributes new queries to available connections, handles to which are stored in a VecDeque as Receiver objects.

If all the connections are busy, the query is stored in a different queue and popped out by the worker threads as they complete their previous queries. Both the connection handle queue and the query queue are held in Arc> objects to allow them to be accessed by different threads.

I've tested the system to ensure that it's indeed asynchronous, but I wanted to ask around and see if there was anything I could do to make it better either syntactically or performance-wise.

```
use std::collections::VecDeque;
use std::thread;
use std::sync::{Arc, Mutex};

use postgres;
use futures::stream::{Stream, channel, Sender, Receiver};
use futures::{Future, oneshot, Complete};

use transport::postgres::get_client;

// helper types to keep function declarations clean
type QueryError = postgres::error::Error;
type SenderQueue = Arc), ()>>>>;
type QueryQueue = Arc>>;

pub struct QueryServer {
conn_count: usize, // how many connections to open
query_queue: QueryQueue, // internal query queue
conn_queue: SenderQueue, // senders for idle query threads
}

// locks the QueryQueue and returns a queued query, if there are any.
fn try_get_new_query(query_queue: QueryQueue) -> Option {
let mut qq_inner = query_queue.lock().unwrap();

Solution

Here are a few shallow comments on the code.

This code

let mut qq_inner = query_queue.lock().unwrap();
// there is a queued query
if !qq_inner.is_empty() {
    return Some(qq_inner.pop_front().unwrap())
}else{
    // No new queries
    return None
}


looks like a verbose way of writing

query_queue.lock().unwrap().pop_front()


The signature

fn try_get_new_query(query_queue: QueryQueue) -> Option


means that it takes an Arc, but the code has no interest in ownership so should really be

fn try_get_new_query(query_queue: &Mutex>) -> Option


instead. The inner type is better labelled QueryQueue and the outer types are Arc and &QueryQueue. This gives your API more flexibility, and since Arc dereferences to a normal &-reference, this does not prevent any prior use-cases.

This code

#[allow(unused_must_use)]
fn execute_query(query: String, client: &postgres::Connection) {
    client.execute(query.as_str(), &[])
        /*.map_err(|err| println!("Error saving tick: {:?}", err) )*/;
}


firstly has a very strange comment, since map_err should almost never involve println!. I could understand logging, but not if it throws away the error.

Secondly, it'd be nicer to write as

fn execute_query(query: String, client: &postgres::Connection) {
    let _ = client.execute(query.as_str(), &[]);
}


I'd also write it as

fn execute_query(query: &str, client: &postgres::Connection) {
    let _ = client.execute(query, &[]);
}


since it also doesn't really make calling any harder and increases flexibility.

This is a totally optional style point, but IMO,

thread::spawn(move || { init_query_processor(rx, qq_copy) });


is nicer as

thread::spawn(move || init_query_processor(rx, qq_copy));


In execute, there's another minor style point which is that }else{ should be } else {.

Personally, given that let (c, o) = oneshot::(); only uses c and o once, there's no great need to shorten them so much. YMMV.

Code Snippets

let mut qq_inner = query_queue.lock().unwrap();
// there is a queued query
if !qq_inner.is_empty() {
    return Some(qq_inner.pop_front().unwrap())
}else{
    // No new queries
    return None
}
query_queue.lock().unwrap().pop_front()
fn try_get_new_query(query_queue: QueryQueue) -> Option<String>
fn try_get_new_query(query_queue: &Mutex<VecDeque<String>>) -> Option<String>
#[allow(unused_must_use)]
fn execute_query(query: String, client: &postgres::Connection) {
    client.execute(query.as_str(), &[])
        /*.map_err(|err| println!("Error saving tick: {:?}", err) )*/;
}

Context

StackExchange Code Review Q#139815, answer score: 3

Revisions (0)

No revisions yet.