HiveBrain v1.2.0
Get Started
← Back to all entries
snippetrustCritical

How can I perform parallel asynchronous HTTP GET requests with reqwest?

Submitted by: @import:stackoverflow-api··
0
Viewed 0 times
parallelgethowwithasynchronousreqwestcanrequestshttpperform

Problem

The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.

How could this be done?

Solution

Concurrent requests

As of reqwest 0.11.14:
use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
let client = Client::new();

let urls = vec!["https://api.ipify.org"; 2];

let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);

bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}


stream::iter(urls)


stream::iter

Take a collection of strings and convert it into a Stream.

.map(|url| {


StreamExt::map

Run an asynchronous function on every element in the stream and transform the element to a new type.

let client = &client;
async move {


Take an explicit reference to the Client and move the reference (not the original Client) into an anonymous asynchronous block.

let resp = client.get(url).send().await?;


Start an asynchronous GET request using the Client's connection pool and wait for the request.

resp.bytes().await


Request and wait for the bytes of the response.

.buffer_unordered(N);


StreamExt::buffer_unordered

Convert a stream of futures into a stream of those future's values, executing the futures concurrently.

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;


StreamExt::for_each

Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.

See also:

  • Join futures with limited concurrency



  • How to merge iterator of streams?



  • How do I synchronously return a value calculated in an asynchronous Future?



  • What is the difference between then, and_then and or_else in Rust futures?



Without bounded execution

If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all:

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}


I'd encourage using the first example as you usually want to limit the concurrency, which buffer and buffer_unordered help with.
Parallel requests

Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}


The primary differences are:

  • We use tokio::spawn to perform work in separate tasks.



  • We have to give each task its own reqwest::Client. As recommended, we clone a shared client to make use of the connection pool.



  • There's an additional error case when the task cannot be joined.



See also:

  • What is the difference between concurrent programming and parallel programming?



  • What is the difference between concurrency and parallelism?



  • What is the difference between concurrency, parallelism and asynchronous methods?

Code Snippets

stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await

Context

Stack Overflow Q#51044467, score: 180

Revisions (0)

No revisions yet.