snippetrustCritical
How can I perform parallel asynchronous HTTP GET requests with reqwest?
Viewed 0 times
parallelgethowwithasynchronousreqwestcanrequestshttpperform
Problem
The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.
How could this be done?
How could this be done?
Solution
Concurrent requests
As of reqwest 0.11.14:
Take a collection of strings and convert it into a
Run an asynchronous function on every element in the stream and transform the element to a new type.
Take an explicit reference to the
Start an asynchronous GET request using the
Request and wait for the bytes of the response.
Convert a stream of futures into a stream of those future's values, executing the futures concurrently.
Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.
See also:
Without bounded execution
If you wanted to, you could also convert an iterator into an iterator of futures and use
I'd encourage using the first example as you usually want to limit the concurrency, which
Parallel requests
Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.
The primary differences are:
See also:
As of reqwest 0.11.14:
use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)stream::iterTake a collection of strings and convert it into a
Stream..map(|url| {StreamExt::mapRun an asynchronous function on every element in the stream and transform the element to a new type.
let client = &client;
async move {Take an explicit reference to the
Client and move the reference (not the original Client) into an anonymous asynchronous block.let resp = client.get(url).send().await?;Start an asynchronous GET request using the
Client's connection pool and wait for the request.resp.bytes().awaitRequest and wait for the bytes of the response.
.buffer_unordered(N);StreamExt::buffer_unorderedConvert a stream of futures into a stream of those future's values, executing the futures concurrently.
bodies
.for_each(|b| {
async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
})
.await;StreamExt::for_eachConvert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.
See also:
- Join futures with limited concurrency
- How to merge iterator of streams?
- How do I synchronously return a value calculated in an asynchronous Future?
- What is the difference between
then,and_thenandor_elsein Rust futures?
Without bounded execution
If you wanted to, you could also convert an iterator into an iterator of futures and use
future::join_all:use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}I'd encourage using the first example as you usually want to limit the concurrency, which
buffer and buffer_unordered help with.Parallel requests
Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}The primary differences are:
- We use
tokio::spawnto perform work in separate tasks.
- We have to give each task its own
reqwest::Client. As recommended, we clone a shared client to make use of the connection pool.
- There's an additional error case when the task cannot be joined.
See also:
- What is the difference between concurrent programming and parallel programming?
- What is the difference between concurrency and parallelism?
- What is the difference between concurrency, parallelism and asynchronous methods?
Code Snippets
stream::iter(urls).map(|url| {let client = &client;
async move {let resp = client.get(url).send().await?;resp.bytes().awaitContext
Stack Overflow Q#51044467, score: 180
Revisions (0)
No revisions yet.