443 lines
33 KiB
HTML
443 lines
33 KiB
HTML
<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta name="generator" content="rustdoc"><meta name="description" content="Waits on multiple concurrent branches, returning when the first branch completes, cancelling the remaining branches."><title>select in tokio - Rust</title><script>if(window.location.protocol!=="file:")document.head.insertAdjacentHTML("beforeend","SourceSerif4-Regular-6b053e98.ttf.woff2,FiraSans-Italic-81dc35de.woff2,FiraSans-Regular-0fe48ade.woff2,FiraSans-MediumItalic-ccf7e434.woff2,FiraSans-Medium-e1aa3f0a.woff2,SourceCodePro-Regular-8badfe75.ttf.woff2,SourceCodePro-Semibold-aa29a496.ttf.woff2".split(",").map(f=>`<link rel="preload" as="font" type="font/woff2"href="../static.files/${f}">`).join(""))</script><link rel="stylesheet" href="../static.files/normalize-9960930a.css"><link rel="stylesheet" href="../static.files/rustdoc-ca0dd0c4.css"><meta name="rustdoc-vars" data-root-path="../" data-static-root-path="../static.files/" data-current-crate="tokio" data-themes="" data-resource-suffix="" data-rustdoc-version="1.93.1 (01f6ddf75 2026-02-11) (Arch Linux rust 1:1.93.1-1)" data-channel="1.93.1" data-search-js="search-9e2438ea.js" data-stringdex-js="stringdex-a3946164.js" data-settings-js="settings-c38705f0.js" ><script src="../static.files/storage-e2aeef58.js"></script><script defer src="sidebar-items.js"></script><script defer src="../static.files/main-a410ff4d.js"></script><noscript><link rel="stylesheet" href="../static.files/noscript-263c88ec.css"></noscript><link rel="alternate icon" type="image/png" href="../static.files/favicon-32x32-eab170b8.png"><link rel="icon" type="image/svg+xml" href="../static.files/favicon-044be391.svg"></head><body class="rustdoc macro"><!--[if lte IE 11]><div class="warning">This old browser is unsupported and will most likely display funky things.</div><![endif]--><rustdoc-topbar><h2><a href="#">select</a></h2></rustdoc-topbar><nav class="sidebar"><div class="sidebar-crate"><h2><a href="../tokio/index.html">tokio</a><span class="version">1.49.0</span></h2></div><div class="sidebar-elems"><section id="rustdoc-toc"><h2 class="location"><a href="#">select</a></h2><h3><a href="#">Sections</a></h3><ul class="block top-toc"><li><a href="#runtime-characteristics" title="Runtime characteristics">Runtime characteristics</a></li><li><a href="#fairness" title="Fairness">Fairness</a></li><li><a href="#panics" title="Panics">Panics</a></li><li><a href="#cancellation-safety" title="Cancellation safety">Cancellation safety</a></li><li><a href="#examples" title="Examples">Examples</a><ul><li><a href="#avoid-racy-if-preconditions" title="Avoid racy `if` preconditions">Avoid racy <code>if</code> preconditions</a></li></ul></li><li><a href="#alternatives-from-the-ecosystem" title="Alternatives from the Ecosystem">Alternatives from the Ecosystem</a><ul><li><a href="#merging-streams" title="Merging Streams">Merging Streams</a></li><li><a href="#racing-futures" title="Racing Futures">Racing Futures</a></li></ul></li></ul></section><div id="rustdoc-modnav"><h2 class="in-crate"><a href="index.html">In crate tokio</a></h2></div></div></nav><div class="sidebar-resizer" title="Drag to resize sidebar"></div><main><div class="width-limiter"><section id="main-content" class="content"><div class="main-heading"><div class="rustdoc-breadcrumbs"><a href="index.html">tokio</a></div><h1>Macro <span class="macro">select</span> <button id="copy-path" title="Copy item path to clipboard">Copy item path</button></h1><rustdoc-toolbar></rustdoc-toolbar><span class="sub-heading"><a class="src" href="../src/tokio/macros/select.rs.html#554-568">Source</a> </span></div><pre class="rust item-decl"><code>macro_rules! select {
|
||
{
|
||
$(
|
||
biased;
|
||
)?
|
||
$(
|
||
$bind:pat = $fut:expr $(, if $cond:expr)? => $handler:expr,
|
||
)*
|
||
$(
|
||
else => $els:expr $(,)?
|
||
)?
|
||
} => { ... };
|
||
}</code></pre><details class="toggle top-doc" open><summary class="hideme"><span>Expand description</span></summary><div class="docblock"><p>Waits on multiple concurrent branches, returning when the <strong>first</strong> branch
|
||
completes, cancelling the remaining branches.</p>
|
||
<p>The <code>select!</code> macro must be used inside of async functions, closures, and
|
||
blocks.</p>
|
||
<p>The <code>select!</code> macro accepts one or more branches with the following pattern:</p>
|
||
<div class="example-wrap"><pre class="language-text"><code><pattern> = <async expression> (, if <precondition>)? => <handler>,</code></pre></div>
|
||
<p>Additionally, the <code>select!</code> macro may include a single, optional <code>else</code>
|
||
branch, which evaluates if none of the other branches match their patterns:</p>
|
||
<div class="example-wrap"><pre class="language-text"><code>else => <expression></code></pre></div>
|
||
<p>The macro aggregates all <code><async expression></code> expressions and runs them
|
||
concurrently on the <strong>current</strong> task. Once the <strong>first</strong> expression
|
||
completes with a value that matches its <code><pattern></code>, the <code>select!</code> macro
|
||
returns the result of evaluating the completed branch’s <code><handler></code>
|
||
expression.</p>
|
||
<p>Additionally, each branch may include an optional <code>if</code> precondition. If the
|
||
precondition returns <code>false</code>, then the branch is disabled. The provided
|
||
<code><async expression></code> is still evaluated but the resulting future is never
|
||
polled. This capability is useful when using <code>select!</code> within a loop.</p>
|
||
<p>The complete lifecycle of a <code>select!</code> expression is as follows:</p>
|
||
<ol>
|
||
<li>Evaluate all provided <code><precondition></code> expressions. If the precondition
|
||
returns <code>false</code>, disable the branch for the remainder of the current call
|
||
to <code>select!</code>. Re-entering <code>select!</code> due to a loop clears the “disabled”
|
||
state.</li>
|
||
<li>Aggregate the <code><async expression></code>s from each branch, including the
|
||
disabled ones. If the branch is disabled, <code><async expression></code> is still
|
||
evaluated, but the resulting future is not polled.</li>
|
||
<li>If <strong>all</strong> branches are disabled: go to step 6.</li>
|
||
<li>Concurrently await on the results for all remaining <code><async expression></code>s.</li>
|
||
<li>Once an <code><async expression></code> returns a value, attempt to apply the value to the
|
||
provided <code><pattern></code>. If the pattern matches, evaluate the <code><handler></code> and return.
|
||
If the pattern <strong>does not</strong> match, disable the current branch for the remainder of
|
||
the current call to <code>select!</code>. Continue from step 3.</li>
|
||
<li>Evaluate the <code>else</code> expression. If no else expression is provided, panic.</li>
|
||
</ol>
|
||
<h2 id="runtime-characteristics"><a class="doc-anchor" href="#runtime-characteristics">§</a>Runtime characteristics</h2>
|
||
<p>By running all async expressions on the current task, the expressions are
|
||
able to run <strong>concurrently</strong> but not in <strong>parallel</strong>. This means all
|
||
expressions are run on the same thread and if one branch blocks the thread,
|
||
all other expressions will be unable to continue. If parallelism is
|
||
required, spawn each async expression using <a href="task/fn.spawn.html" title="fn tokio::task::spawn"><code>tokio::spawn</code></a> and pass the
|
||
join handle to <code>select!</code>.</p>
|
||
<h2 id="fairness"><a class="doc-anchor" href="#fairness">§</a>Fairness</h2>
|
||
<p>By default, <code>select!</code> randomly picks a branch to check first. This provides
|
||
some level of fairness when calling <code>select!</code> in a loop with branches that
|
||
are always ready.</p>
|
||
<p>This behavior can be overridden by adding <code>biased;</code> to the beginning of the
|
||
macro usage. See the examples for details. This will cause <code>select</code> to poll
|
||
the futures in the order they appear from top to bottom. There are a few
|
||
reasons you may want this:</p>
|
||
<ul>
|
||
<li>The random number generation of <code>tokio::select!</code> has a non-zero CPU cost</li>
|
||
<li>Your futures may interact in a way where known polling order is significant</li>
|
||
</ul>
|
||
<p>But there is an important caveat to this mode. It becomes your responsibility
|
||
to ensure that the polling order of your futures is fair. If for example you
|
||
are selecting between a stream and a shutdown future, and the stream has a
|
||
huge volume of messages and zero or nearly zero time between them, you should
|
||
place the shutdown future earlier in the <code>select!</code> list to ensure that it is
|
||
always polled, and will not be ignored due to the stream being constantly
|
||
ready.</p>
|
||
<h2 id="panics"><a class="doc-anchor" href="#panics">§</a>Panics</h2>
|
||
<p>The <code>select!</code> macro panics if all branches are disabled <strong>and</strong> there is no
|
||
provided <code>else</code> branch. A branch is disabled when the provided <code>if</code>
|
||
precondition returns <code>false</code> <strong>or</strong> when the pattern does not match the
|
||
result of <code><async expression></code>.</p>
|
||
<h2 id="cancellation-safety"><a class="doc-anchor" href="#cancellation-safety">§</a>Cancellation safety</h2>
|
||
<p>When using <code>select!</code> in a loop to receive messages from multiple sources,
|
||
you should make sure that the receive call is cancellation safe to avoid
|
||
losing messages. This section goes through various common methods and
|
||
describes whether they are cancel safe. The lists in this section are not
|
||
exhaustive.</p>
|
||
<p>The following methods are cancellation safe:</p>
|
||
<ul>
|
||
<li><a href="sync/mpsc/struct.Receiver.html#method.recv" title="method tokio::sync::mpsc::Receiver::recv"><code>tokio::sync::mpsc::Receiver::recv</code></a></li>
|
||
<li><a href="sync/mpsc/struct.UnboundedReceiver.html#method.recv" title="method tokio::sync::mpsc::UnboundedReceiver::recv"><code>tokio::sync::mpsc::UnboundedReceiver::recv</code></a></li>
|
||
<li><a href="sync/broadcast/struct.Receiver.html#method.recv" title="method tokio::sync::broadcast::Receiver::recv"><code>tokio::sync::broadcast::Receiver::recv</code></a></li>
|
||
<li><a href="sync/watch/struct.Receiver.html#method.changed" title="method tokio::sync::watch::Receiver::changed"><code>tokio::sync::watch::Receiver::changed</code></a></li>
|
||
<li><a href="net/struct.TcpListener.html#method.accept" title="method tokio::net::TcpListener::accept"><code>tokio::net::TcpListener::accept</code></a></li>
|
||
<li><a href="net/struct.UnixListener.html#method.accept" title="method tokio::net::UnixListener::accept"><code>tokio::net::UnixListener::accept</code></a></li>
|
||
<li><a href="signal/unix/struct.Signal.html#method.recv" title="method tokio::signal::unix::Signal::recv"><code>tokio::signal::unix::Signal::recv</code></a></li>
|
||
<li><a href="io/trait.AsyncReadExt.html#method.read" title="method tokio::io::AsyncReadExt::read"><code>tokio::io::AsyncReadExt::read</code></a> on any <code>AsyncRead</code></li>
|
||
<li><a href="io/trait.AsyncReadExt.html#method.read_buf" title="method tokio::io::AsyncReadExt::read_buf"><code>tokio::io::AsyncReadExt::read_buf</code></a> on any <code>AsyncRead</code></li>
|
||
<li><a href="io/trait.AsyncWriteExt.html#method.write" title="method tokio::io::AsyncWriteExt::write"><code>tokio::io::AsyncWriteExt::write</code></a> on any <code>AsyncWrite</code></li>
|
||
<li><a href="io/trait.AsyncWriteExt.html#method.write_buf" title="method tokio::io::AsyncWriteExt::write_buf"><code>tokio::io::AsyncWriteExt::write_buf</code></a> on any <code>AsyncWrite</code></li>
|
||
<li><a href="https://docs.rs/tokio-stream/0.1/tokio_stream/trait.StreamExt.html#method.next"><code>tokio_stream::StreamExt::next</code></a> on any <code>Stream</code></li>
|
||
<li><a href="https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.next"><code>futures::stream::StreamExt::next</code></a> on any <code>Stream</code></li>
|
||
</ul>
|
||
<p>The following methods are not cancellation safe and can lead to loss of data:</p>
|
||
<ul>
|
||
<li><a href="io/trait.AsyncReadExt.html#method.read_exact" title="method tokio::io::AsyncReadExt::read_exact"><code>tokio::io::AsyncReadExt::read_exact</code></a></li>
|
||
<li><a href="io/trait.AsyncReadExt.html#method.read_to_end" title="method tokio::io::AsyncReadExt::read_to_end"><code>tokio::io::AsyncReadExt::read_to_end</code></a></li>
|
||
<li><a href="io/trait.AsyncReadExt.html#method.read_to_string" title="method tokio::io::AsyncReadExt::read_to_string"><code>tokio::io::AsyncReadExt::read_to_string</code></a></li>
|
||
<li><a href="io/trait.AsyncWriteExt.html#method.write_all" title="method tokio::io::AsyncWriteExt::write_all"><code>tokio::io::AsyncWriteExt::write_all</code></a></li>
|
||
</ul>
|
||
<p>The following methods are not cancellation safe because they use a queue for
|
||
fairness and cancellation makes you lose your place in the queue:</p>
|
||
<ul>
|
||
<li><a href="sync/struct.Mutex.html#method.lock" title="method tokio::sync::Mutex::lock"><code>tokio::sync::Mutex::lock</code></a></li>
|
||
<li><a href="sync/struct.RwLock.html#method.read" title="method tokio::sync::RwLock::read"><code>tokio::sync::RwLock::read</code></a></li>
|
||
<li><a href="sync/struct.RwLock.html#method.write" title="method tokio::sync::RwLock::write"><code>tokio::sync::RwLock::write</code></a></li>
|
||
<li><a href="sync/struct.Semaphore.html#method.acquire" title="method tokio::sync::Semaphore::acquire"><code>tokio::sync::Semaphore::acquire</code></a></li>
|
||
<li><a href="sync/struct.Notify.html#method.notified" title="method tokio::sync::Notify::notified"><code>tokio::sync::Notify::notified</code></a></li>
|
||
</ul>
|
||
<p>To determine whether your own methods are cancellation safe, look for the
|
||
location of uses of <code>.await</code>. This is because when an asynchronous method is
|
||
cancelled, that always happens at an <code>.await</code>. If your function behaves
|
||
correctly even if it is restarted while waiting at an <code>.await</code>, then it is
|
||
cancellation safe.</p>
|
||
<p>Cancellation safety can be defined in the following way: If you have a
|
||
future that has not yet completed, then it must be a no-op to drop that
|
||
future and recreate it. This definition is motivated by the situation where
|
||
a <code>select!</code> is used in a loop. Without this guarantee, you would lose your
|
||
progress when another branch completes and you restart the <code>select!</code> by
|
||
going around the loop.</p>
|
||
<p>Be aware that cancelling something that is not cancellation safe is not
|
||
necessarily wrong. For example, if you are cancelling a task because the
|
||
application is shutting down, then you probably don’t care that partially
|
||
read data is lost.</p>
|
||
<h2 id="examples"><a class="doc-anchor" href="#examples">§</a>Examples</h2>
|
||
<p>Basic select with two branches.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>do_stuff_async() {
|
||
<span class="comment">// async work
|
||
</span>}
|
||
|
||
<span class="kw">async fn </span>more_async_work() {
|
||
<span class="comment">// more here
|
||
</span>}
|
||
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="kw">_ </span>= do_stuff_async() => {
|
||
<span class="macro">println!</span>(<span class="string">"do_stuff_async() completed first"</span>)
|
||
}
|
||
<span class="kw">_ </span>= more_async_work() => {
|
||
<span class="macro">println!</span>(<span class="string">"more_async_work() completed first"</span>)
|
||
}
|
||
};</code></pre></div>
|
||
<p>Basic stream selecting.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
|
||
|
||
<span class="kw">let </span><span class="kw-2">mut </span>stream1 = stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>]);
|
||
<span class="kw">let </span><span class="kw-2">mut </span>stream2 = stream::iter(<span class="macro">vec!</span>[<span class="number">4</span>, <span class="number">5</span>, <span class="number">6</span>]);
|
||
|
||
<span class="kw">let </span>next = <span class="macro">tokio::select!</span> {
|
||
v = stream1.next() => v.unwrap(),
|
||
v = stream2.next() => v.unwrap(),
|
||
};
|
||
|
||
<span class="macro">assert!</span>(next == <span class="number">1 </span>|| next == <span class="number">4</span>);</code></pre></div>
|
||
<p>Collect the contents of two streams. In this example, we rely on pattern
|
||
matching and the fact that <code>stream::iter</code> is “fused”, i.e. once the stream
|
||
is complete, all calls to <code>next()</code> return <code>None</code>.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
|
||
|
||
<span class="kw">let </span><span class="kw-2">mut </span>stream1 = stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>]);
|
||
<span class="kw">let </span><span class="kw-2">mut </span>stream2 = stream::iter(<span class="macro">vec!</span>[<span class="number">4</span>, <span class="number">5</span>, <span class="number">6</span>]);
|
||
|
||
<span class="kw">let </span><span class="kw-2">mut </span>values = <span class="macro">vec!</span>[];
|
||
|
||
<span class="kw">loop </span>{
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="prelude-val">Some</span>(v) = stream1.next() => values.push(v),
|
||
<span class="prelude-val">Some</span>(v) = stream2.next() => values.push(v),
|
||
<span class="kw">else </span>=> <span class="kw">break</span>,
|
||
}
|
||
}
|
||
|
||
values.sort();
|
||
<span class="macro">assert_eq!</span>(<span class="kw-2">&</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>, <span class="number">4</span>, <span class="number">5</span>, <span class="number">6</span>], <span class="kw-2">&</span>values[..]);</code></pre></div>
|
||
<p>Using the same future in multiple <code>select!</code> expressions can be done by passing
|
||
a reference to the future. Doing so requires the future to be <a href="https://doc.rust-lang.org/1.93.1/core/marker/trait.Unpin.html" title="trait core::marker::Unpin"><code>Unpin</code></a>. A
|
||
future can be made <a href="https://doc.rust-lang.org/1.93.1/core/marker/trait.Unpin.html" title="trait core::marker::Unpin"><code>Unpin</code></a> by either using <a href="https://doc.rust-lang.org/1.93.1/alloc/boxed/struct.Box.html#method.pin" title="associated function alloc::boxed::Box::pin"><code>Box::pin</code></a> or stack pinning.</p>
|
||
<p>Here, a stream is consumed for at most 1 second.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
|
||
<span class="kw">use </span>tokio::time::{<span class="self">self</span>, Duration};
|
||
|
||
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>]);
|
||
<span class="kw">let </span>sleep = time::sleep(Duration::from_secs(<span class="number">1</span>));
|
||
<span class="macro">tokio::pin!</span>(sleep);
|
||
|
||
<span class="kw">loop </span>{
|
||
<span class="macro">tokio::select!</span> {
|
||
maybe_v = stream.next() => {
|
||
<span class="kw">if let </span><span class="prelude-val">Some</span>(v) = maybe_v {
|
||
<span class="macro">println!</span>(<span class="string">"got = {}"</span>, v);
|
||
} <span class="kw">else </span>{
|
||
<span class="kw">break</span>;
|
||
}
|
||
}
|
||
<span class="kw">_ </span>= <span class="kw-2">&mut </span>sleep => {
|
||
<span class="macro">println!</span>(<span class="string">"timeout"</span>);
|
||
<span class="kw">break</span>;
|
||
}
|
||
}
|
||
}</code></pre></div>
|
||
<p>Joining two values using <code>select!</code>.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio::sync::oneshot;
|
||
|
||
<span class="kw">let </span>(tx1, <span class="kw-2">mut </span>rx1) = oneshot::channel();
|
||
<span class="kw">let </span>(tx2, <span class="kw-2">mut </span>rx2) = oneshot::channel();
|
||
|
||
tokio::spawn(<span class="kw">async move </span>{
|
||
tx1.send(<span class="string">"first"</span>).unwrap();
|
||
});
|
||
|
||
tokio::spawn(<span class="kw">async move </span>{
|
||
tx2.send(<span class="string">"second"</span>).unwrap();
|
||
});
|
||
|
||
<span class="kw">let </span><span class="kw-2">mut </span>a = <span class="prelude-val">None</span>;
|
||
<span class="kw">let </span><span class="kw-2">mut </span>b = <span class="prelude-val">None</span>;
|
||
|
||
<span class="kw">while </span>a.is_none() || b.is_none() {
|
||
<span class="macro">tokio::select!</span> {
|
||
v1 = (<span class="kw-2">&mut </span>rx1), <span class="kw">if </span>a.is_none() => a = <span class="prelude-val">Some</span>(v1.unwrap()),
|
||
v2 = (<span class="kw-2">&mut </span>rx2), <span class="kw">if </span>b.is_none() => b = <span class="prelude-val">Some</span>(v2.unwrap()),
|
||
}
|
||
}
|
||
|
||
<span class="kw">let </span>res = (a.unwrap(), b.unwrap());
|
||
|
||
<span class="macro">assert_eq!</span>(res.<span class="number">0</span>, <span class="string">"first"</span>);
|
||
<span class="macro">assert_eq!</span>(res.<span class="number">1</span>, <span class="string">"second"</span>);</code></pre></div>
|
||
<p>Using the <code>biased;</code> mode to control polling order.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">let </span><span class="kw-2">mut </span>count = <span class="number">0u8</span>;
|
||
|
||
<span class="kw">loop </span>{
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="comment">// If you run this example without `biased;`, the polling order is
|
||
// pseudo-random, and the assertions on the value of count will
|
||
// (probably) fail.
|
||
</span>biased;
|
||
|
||
<span class="kw">_ </span>= <span class="kw">async </span>{}, <span class="kw">if </span>count < <span class="number">1 </span>=> {
|
||
count += <span class="number">1</span>;
|
||
<span class="macro">assert_eq!</span>(count, <span class="number">1</span>);
|
||
}
|
||
<span class="kw">_ </span>= <span class="kw">async </span>{}, <span class="kw">if </span>count < <span class="number">2 </span>=> {
|
||
count += <span class="number">1</span>;
|
||
<span class="macro">assert_eq!</span>(count, <span class="number">2</span>);
|
||
}
|
||
<span class="kw">_ </span>= <span class="kw">async </span>{}, <span class="kw">if </span>count < <span class="number">3 </span>=> {
|
||
count += <span class="number">1</span>;
|
||
<span class="macro">assert_eq!</span>(count, <span class="number">3</span>);
|
||
}
|
||
<span class="kw">_ </span>= <span class="kw">async </span>{}, <span class="kw">if </span>count < <span class="number">4 </span>=> {
|
||
count += <span class="number">1</span>;
|
||
<span class="macro">assert_eq!</span>(count, <span class="number">4</span>);
|
||
}
|
||
|
||
<span class="kw">else </span>=> {
|
||
<span class="kw">break</span>;
|
||
}
|
||
};
|
||
}</code></pre></div><h3 id="avoid-racy-if-preconditions"><a class="doc-anchor" href="#avoid-racy-if-preconditions">§</a>Avoid racy <code>if</code> preconditions</h3>
|
||
<p>Given that <code>if</code> preconditions are used to disable <code>select!</code> branches, some
|
||
caution must be used to avoid missing values.</p>
|
||
<p>For example, here is <strong>incorrect</strong> usage of <code>sleep</code> with <code>if</code>. The objective
|
||
is to repeatedly run an asynchronous task for up to 50 milliseconds.
|
||
However, there is a potential for the <code>sleep</code> completion to be missed.</p>
|
||
|
||
<div class="example-wrap should_panic"><a href="#" class="tooltip" title="This example panics">ⓘ</a><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio::time::{<span class="self">self</span>, Duration};
|
||
|
||
<span class="kw">async fn </span>some_async_work() {
|
||
<span class="comment">// do work
|
||
</span>}
|
||
|
||
<span class="kw">let </span>sleep = time::sleep(Duration::from_millis(<span class="number">50</span>));
|
||
<span class="macro">tokio::pin!</span>(sleep);
|
||
|
||
<span class="kw">while </span>!sleep.is_elapsed() {
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="kw">_ </span>= <span class="kw-2">&mut </span>sleep, <span class="kw">if </span>!sleep.is_elapsed() => {
|
||
<span class="macro">println!</span>(<span class="string">"operation timed out"</span>);
|
||
}
|
||
<span class="kw">_ </span>= some_async_work() => {
|
||
<span class="macro">println!</span>(<span class="string">"operation completed"</span>);
|
||
}
|
||
}
|
||
}
|
||
|
||
<span class="macro">panic!</span>(<span class="string">"This example shows how not to do it!"</span>);</code></pre></div>
|
||
<p>In the above example, <code>sleep.is_elapsed()</code> may return <code>true</code> even if
|
||
<code>sleep.poll()</code> never returned <code>Ready</code>. This opens up a potential race
|
||
condition where <code>sleep</code> expires between the <code>while !sleep.is_elapsed()</code>
|
||
check and the call to <code>select!</code> resulting in the <code>some_async_work()</code> call to
|
||
run uninterrupted despite the sleep having elapsed.</p>
|
||
<p>One way to write the above example without the race would be:</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio::time::{<span class="self">self</span>, Duration};
|
||
|
||
<span class="kw">async fn </span>some_async_work() {
|
||
<span class="comment">// do work
|
||
</span>}
|
||
|
||
<span class="kw">let </span>sleep = time::sleep(Duration::from_millis(<span class="number">50</span>));
|
||
<span class="macro">tokio::pin!</span>(sleep);
|
||
|
||
<span class="kw">loop </span>{
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="kw">_ </span>= <span class="kw-2">&mut </span>sleep => {
|
||
<span class="macro">println!</span>(<span class="string">"operation timed out"</span>);
|
||
<span class="kw">break</span>;
|
||
}
|
||
<span class="kw">_ </span>= some_async_work() => {
|
||
<span class="macro">println!</span>(<span class="string">"operation completed"</span>);
|
||
}
|
||
}
|
||
}</code></pre></div><h2 id="alternatives-from-the-ecosystem"><a class="doc-anchor" href="#alternatives-from-the-ecosystem">§</a>Alternatives from the Ecosystem</h2>
|
||
<p>The <code>select!</code> macro is a powerful tool for managing multiple asynchronous
|
||
branches, enabling tasks to run concurrently within the same thread. However,
|
||
its use can introduce challenges, particularly around cancellation safety, which
|
||
can lead to subtle and hard-to-debug errors. For many use cases, ecosystem
|
||
alternatives may be preferable as they mitigate these concerns by offering
|
||
clearer syntax, more predictable control flow, and reducing the need to manually
|
||
handle issues like fuse semantics or cancellation safety.</p>
|
||
<h3 id="merging-streams"><a class="doc-anchor" href="#merging-streams">§</a>Merging Streams</h3>
|
||
<p>For cases where <code>loop { select! { ... } }</code> is used to poll multiple tasks,
|
||
stream merging offers a concise alternative, inherently handle cancellation-safe
|
||
processing, removing the risk of data loss. Libraries such as <a href="https://docs.rs/tokio-stream/latest/tokio_stream/"><code>tokio_stream</code></a>,
|
||
<a href="https://docs.rs/futures/latest/futures/stream/"><code>futures::stream</code></a> and <a href="https://docs.rs/futures-concurrency/latest/futures_concurrency/"><code>futures_concurrency</code></a> provide tools for merging
|
||
streams and handling their outputs sequentially.</p>
|
||
<h4 id="example-with-select"><a class="doc-anchor" href="#example-with-select">§</a>Example with <code>select!</code></h4>
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">struct </span>File;
|
||
<span class="kw">struct </span>Channel;
|
||
<span class="kw">struct </span>Socket;
|
||
|
||
<span class="kw">impl </span>Socket {
|
||
<span class="kw">async fn </span>read_packet(<span class="kw-2">&mut </span><span class="self">self</span>) -> Vec<u8> {
|
||
<span class="macro">vec!</span>[]
|
||
}
|
||
}
|
||
|
||
<span class="kw">async fn </span>read_send(_file: <span class="kw-2">&mut </span>File, _channel: <span class="kw-2">&mut </span>Channel) {
|
||
<span class="comment">// do work that is not cancel safe
|
||
</span>}
|
||
|
||
<span class="comment">// open our IO types
|
||
</span><span class="kw">let </span><span class="kw-2">mut </span>file = File;
|
||
<span class="kw">let </span><span class="kw-2">mut </span>channel = Channel;
|
||
<span class="kw">let </span><span class="kw-2">mut </span>socket = Socket;
|
||
|
||
<span class="kw">loop </span>{
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="kw">_ </span>= read_send(<span class="kw-2">&mut </span>file, <span class="kw-2">&mut </span>channel) => { <span class="comment">/* ... */ </span>},
|
||
_data = socket.read_packet() => { <span class="comment">/* ... */ </span>}
|
||
<span class="kw">_ </span>= futures::future::ready(()) => <span class="kw">break
|
||
</span>}
|
||
}</code></pre></div><h4 id="moving-to-merge"><a class="doc-anchor" href="#moving-to-merge">§</a>Moving to <code>merge</code></h4>
|
||
<p>By using merge, you can unify multiple asynchronous tasks into a single stream,
|
||
eliminating the need to manage tasks manually and reducing the risk of
|
||
unintended behavior like data loss.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>std::pin::pin;
|
||
|
||
<span class="kw">use </span>futures::stream::unfold;
|
||
<span class="kw">use </span>tokio_stream::StreamExt;
|
||
|
||
<span class="kw">struct </span>File;
|
||
<span class="kw">struct </span>Channel;
|
||
<span class="kw">struct </span>Socket;
|
||
|
||
<span class="kw">impl </span>Socket {
|
||
<span class="kw">async fn </span>read_packet(<span class="kw-2">&mut </span><span class="self">self</span>) -> Vec<u8> {
|
||
<span class="macro">vec!</span>[]
|
||
}
|
||
}
|
||
|
||
<span class="kw">async fn </span>read_send(_file: <span class="kw-2">&mut </span>File, _channel: <span class="kw-2">&mut </span>Channel) {
|
||
<span class="comment">// do work that is not cancel safe
|
||
</span>}
|
||
|
||
<span class="kw">enum </span>Message {
|
||
Stop,
|
||
Sent,
|
||
Data(Vec<u8>),
|
||
}
|
||
|
||
<span class="comment">// open our IO types
|
||
</span><span class="kw">let </span>file = File;
|
||
<span class="kw">let </span>channel = Channel;
|
||
<span class="kw">let </span>socket = Socket;
|
||
|
||
<span class="kw">let </span>a = unfold((file, channel), |(<span class="kw-2">mut </span>file, <span class="kw-2">mut </span>channel)| <span class="kw">async </span>{
|
||
read_send(<span class="kw-2">&mut </span>file, <span class="kw-2">&mut </span>channel).<span class="kw">await</span>;
|
||
<span class="prelude-val">Some</span>((Message::Sent, (file, channel)))
|
||
});
|
||
<span class="kw">let </span>b = unfold(socket, |<span class="kw-2">mut </span>socket| <span class="kw">async </span>{
|
||
<span class="kw">let </span>data = socket.read_packet().<span class="kw">await</span>;
|
||
<span class="prelude-val">Some</span>((Message::Data(data), socket))
|
||
});
|
||
<span class="kw">let </span>c = tokio_stream::iter([Message::Stop]);
|
||
|
||
<span class="kw">let </span><span class="kw-2">mut </span>s = <span class="macro">pin!</span>(a.merge(b).merge(c));
|
||
<span class="kw">while let </span><span class="prelude-val">Some</span>(msg) = s.next().<span class="kw">await </span>{
|
||
<span class="kw">match </span>msg {
|
||
Message::Data(_data) => { <span class="comment">/* ... */ </span>}
|
||
Message::Sent => <span class="kw">continue</span>,
|
||
Message::Stop => <span class="kw">break</span>,
|
||
}
|
||
}</code></pre></div><h3 id="racing-futures"><a class="doc-anchor" href="#racing-futures">§</a>Racing Futures</h3>
|
||
<p>If you need to wait for the first completion among several asynchronous tasks,
|
||
ecosystem utilities such as
|
||
<a href="https://docs.rs/futures/latest/futures/"><code>futures</code></a>,
|
||
<a href="https://docs.rs/futures-lite/latest/futures_lite/"><code>futures-lite</code></a> or
|
||
<a href="https://docs.rs/futures-concurrency/latest/futures_concurrency/"><code>futures-concurrency</code></a>
|
||
provide streamlined syntax for racing futures:</p>
|
||
<ul>
|
||
<li><a href="https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Race.html"><code>futures_concurrency::future::Race</code></a></li>
|
||
<li><a href="https://docs.rs/futures/latest/futures/macro.select.html"><code>futures::select</code></a></li>
|
||
<li><a href="https://docs.rs/futures/latest/futures/stream/select_all/index.html"><code>futures::stream::select_all</code></a> (for streams)</li>
|
||
<li><a href="https://docs.rs/futures-lite/latest/futures_lite/future/fn.or.html"><code>futures_lite::future::or</code></a></li>
|
||
<li><a href="https://docs.rs/futures-lite/latest/futures_lite/future/fn.race.html"><code>futures_lite::future::race</code></a></li>
|
||
</ul>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>futures_concurrency::future::Race;
|
||
|
||
<span class="kw">let </span>task_a = <span class="kw">async </span>{ <span class="prelude-val">Ok</span>(<span class="string">"ok"</span>) };
|
||
<span class="kw">let </span>task_b = <span class="kw">async </span>{ <span class="prelude-val">Err</span>(<span class="string">"error"</span>) };
|
||
<span class="kw">let </span>result = (task_a, task_b).race().<span class="kw">await</span>;
|
||
|
||
<span class="kw">match </span>result {
|
||
<span class="prelude-val">Ok</span>(output) => <span class="macro">println!</span>(<span class="string">"First task completed with: {output}"</span>),
|
||
<span class="prelude-val">Err</span>(err) => <span class="macro">eprintln!</span>(<span class="string">"Error occurred: {err}"</span>),
|
||
}</code></pre></div></div></details></section></div></main></body></html> |