Journey In Rust:  Adding Animation and Concurrency in Rust - Part 8

Journey In Rust: Adding Animation and Concurrency in Rust - Part 8

Hey there, Rustaceans! It's time to make our Rust app more user-friendly by adding some cool animations. Today, we're going to add a spinning animation and explore concurrency in Rust. Are you ready to add some pizzazz to our app? Let's go!

Time for Some Animations

Our Rust app is now functional, but it's not very interactive. Users don't get any feedback while waiting for the app to process their input. So, we're going to add some animations to spice things up!

After searching for the best way to add animations, I found two options: a progress bar and a spinner. A progress bar might be more informative, but it's not suitable for our use case. So, I opted for a spinner instead. I found two Rust crates that can help us with this: spinner and indicatif.

I tried to use spinner first, but the documentation wasn't very clear, and I couldn't figure out how to use it. So, I decided to go with indicatif. Although it was a bit more challenging than using spinner, it was still a valuable learning experience.

All About Indicatif

Indicatif is a Rust library for adding progress indicators to command-line applications. It offers progress bars, spinners, and basic color support. You can add it to your Cargo.toml file using this command:

cargo add indicatif

Choosing the Right Animation

After much thought, I decided on a spinner animation that looks like this:

⠏ Searching

The spinner will keep spinning (of course!), and there will be a message indicating what's happening. It's simple and elegant, so let's use this for our app.

Setting Up the Spinner

We'll start with the basic boilerplate code from the indicatif documentation:

fn main() {
    let spinner = ProgressBar::new_spinner();
    spinner.set_style(ProgressStyle::default_spinner()
        .template("{spinner} {wide_msg}")
        .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏")
    );
    spinner.set_message("Processing...");
    spinner.enable_steady_tick(100);
}

This code creates a spinner that runs for 100ms and then restarts. However, we want the spinner to run until our processing is complete. To achieve this, we can remove the enable_steady_tick line:

    let spinner = ProgressBar::new_spinner();
    spinner.set_style(ProgressStyle::default_spinner()
        .template("{spinner} {wide_msg}")
        .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏")
    );
    spinner.set_message("Processing...");

    for _ in 0..10 {
        spinner.tick();
        // Perform your task here
        thread::sleep(Duration::from_millis(500));
    }

    spinner.finish_with_message("Done!");

Even though how elegant it may seem it only runs when we are done with task so be are stuck with a loosy animation that runs on same thread, and changes only when a task is done. It's like playing a video game but whenever the game starts you cannot press a key on keyboard, here is an example to make you understand it better


Normally :

Let's say you had to download a file and process it(idk change font color or something) and then download another file and process it. So, this would be how it looks like.

In this normal (sequential) code example, Task 1 (Download File A) is executed first, followed by Task 2 (Process File A). Once Task 2 is completed, Task 3 (Download File B) begins, and finally, Task 4 (Process File B) is executed. Each task must finish before the next one begins, so the tasks are executed sequentially.

What could have been :

In this concurrent code example, Task 1 (Download File A) and Task 2 (Download File B) are started concurrently. This means that both tasks can run simultaneously, reducing the overall time spent waiting for downloads to finish. Once Task 1 is completed, Task 3 (Process File A) begins, and similarly, once Task 2 is completed, Task 4 (Process File B) begins. This allows for faster overall execution since both files are downloaded and processed in parallel.


So doing stuff one my one is doing things on the same thread, where as if we do things all together we have to move one to a divergent thread like we did in the example above, then we can do both of it together.

Taming Concurrency

Concurrency is all about managing multiple tasks in progress, even if they don't execute simultaneously. Think of it as multiple baristas working together, preparing coffee, taking orders, and collecting payments, all happening in a smooth and efficient manner.

Implementing Concurrency in Rust

Get ready for a wild ride as we explore the fantastic duo of Arc and Mutex in Rust! We'll unravel the mystery behind their purpose and how they work together in harmony.

The Arc: Atomic Reference Counting

Picture yourself at a party with a plate of delicious cookies to share. You want to make sure everyone gets an equal share, but how do you keep track of who took how many cookies? Enter Arc - our reference-counting party host!

Arc in Rust manages shared resources across multiple threads by keeping track of the references. As each thread accesses the shared resource, Arc keeps a tally. Once there are no more references left, Rust's memory manager knows it's time to clean up.

The Mutex: Mutual Exclusion

Now that we have Arc to manage shared resources, we need someone to maintain order - that's where Mutex, our friendly concurrency bouncer, comes in!

Mutex ensures that only one thread can access the shared resource at a time, maintaining order and preventing chaos. It's like a lock on a shared resource that a thread must obtain before accessing it.

The Dynamic Duo: Arc + Mutex

When Arc and Mutex join forces, we get a powerful combo that ensures our shared resources are managed efficiently and safely! It's like having a well-coordinated party where everyone gets their fair share of cookies, all thanks to our dynamic duo.

In Rust, you'll often see Arc and Mutex used together for a safe and efficient way to share mutable resources across multiple threads. By wrapping a Mutex with an Arc, we ensure that our Mutex-protected resource is safely shared and reference-counted.

Let's create a shared spinner using Arc and Mutex:

use tokio::sync::Mutex;
use std::sync::Arc;

let shared_spinner: Arc<Mutex<ProgressBar>> = Arc::new(Mutex::new(spinner));

Our shared_spinner object can now be safely shared across multiple threads. Let's create a function to update the spinner message:

async fn update_spinner_message(spinner: Arc<Mutex<ProgressBar>>, message: &str) {
    let spinner = spinner.lock().await;
    spinner.set_message(message);
    spinner.tick();
}

lets update the handle_request function to use this



async fn handle_request(
    query: String,
    tokens: u32,

    spinner: Arc<Mutex<ProgressBar>> ,
) -> Result<(), Box<dyn Error>> {
    // let mut success = false;
    let mut command = None;
    for i in 0..3 {
        update_spinner(
           Arc::clone(&spinner),
           "Searching ({}) ....", i.to_owned(),
        )
       .await;

        let response: ApiResponse = get_response(query.clone(), tokens).await?;

        if let Ok(parsed_command) =
            serde_json::from_str::<Instructions>(&response.choices[0].message.content)
        {
            command = Some(parsed_command);
            break;
        }
    }

Error Handling

So, we were coding an async function called update_spinner and encountered a compiler error:

63 | async fn update_spinner(spinner: Arc<Mutex<ProgressBar>>, message: &str) {
   |                                                           -------  - let's call the lifetime of this reference `'1`
   |                                                           |
   |                                                           `message` is a reference that is only valid in the function body
64 |     let mut spinner = spinner.lock().await;
65 |     spinner.set_message(message);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |     |
   |     `message` escapes the function body here
   |     argument requires that `'1` must outlive `'static`

Essentially, what this means is that we have a lifetime mismatch. The set_message method of the ProgressBar struct expects a &'static str type, which means a reference to a string slice with a 'static lifetime, that is valid for the entire duration of the program. But, we were passing a reference with a shorter lifetime.

To resolve this error, we need to pass a String to the update_spinner function and clone it before passing it to set_message(). This ensures that the message passed to set_message() has the required 'static lifetime.

use std::borrow::Cow;
async fn update_spinner(spinner: Arc<Mutex<ProgressBar>>, message: Cow<'static, str>) {
    let mut spinner = spinner.lock().await;
    spinner.set_message(message);
    spinner.tick();
}

async fn handle_request(query: &str, tokens: &str, spinner: Arc<Mutex<ProgressBar>>) -> Result<(), Box<dyn std::error::Error>> {
    // ...
    // Update the spinner's message using the `update_spinner` function:
    update_spinner(
     Arc::clone(&spinner),
     Cow::Owned(format!("Searching ({}) ....", i.to_owned())),
        ).await;
    // Perform step 1...

    update_spinner(Arc::clone(&spinner), Cow::Borrowed("Processing step 2...")).await;
    // Perform step 2...

    // ...

    Ok(())
}

Now, our code runs without any errors. However, we noticed that the animation wasn't moving at all. This is because we were locking the spinner and then updating it, causing the spinner to be locked for the entire duration of the update. To resolve this issue, we need to keep the spinner running at all times and find a way to just update the message without locking it.

Hello, Rustaceans! Welcome back to our journey of learning Rust and building an open-source tool. In this episode, we're going to learn how to keep the spinner rotating while processing other tasks with message passing. It's like having a conveyor belt in a factory, ensuring smooth operations without delays. Let's get started!

Spinning in Sync

Previously, our spinner wasn't rotating as expected due to await calls blocking it in the handle_request() function. To overcome this, we'll use the mpsc (Multiple Producer, Single Consumer) channel provided by the tokio crate to send messages between different asynchronous tasks. This way, our spinner keeps rotating while other tasks are being processed.

Picture the mpsc channel as a conveyor belt in a factory, where workers (asynchronous tasks) place items (messages) onto the belt. On the other end of the belt, there's another worker (the spinner task) responsible for updating the spinner based on the messages received.

Let's create a new function called update_spinner and another one called create_spinner:

pub async fn update_spinner(spinner: Arc<Mutex<ProgressBar>>, message: Cow<'static, str>) {
    let spinner = spinner.lock().await;
    spinner.set_message(message);
    spinner.tick();
}

pub fn create_spinner() -> Result<Arc<Mutex<ProgressBar>>, Box<dyn Error>> {
    // (implementation details will be explained below)
}

Now, we'll update the main function to use the new spinner functions and add the mpsc channel for communication:

async fn main() -> Result<(), Box<dyn Error>> {
    // ...
    let shared_spinner = create_spinner()?;
    let (tx, mut rx) = mpsc::channel::<Cow<'static, str>>(30);

    // (implementation details will be explained below)
}

Implementing the Spinner Functions

Let's dive into the implementation of the create_spinner function and the spinner tasks in the main function:

pub fn create_spinner() -> Result<Arc<Mutex<ProgressBar>>, Box<dyn Error>> {
    let spinner = ProgressBar::new_spinner();
    spinner.set_style(
        ProgressStyle::default_spinner()
            .template("{spinner} {wide_msg}")?
            .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"),
    );
    let shared_spinner: Arc<Mutex<ProgressBar>> = Arc::new(Mutex::new(spinner));
    Ok(shared_spinner)
}

async fn main() -> Result<(), Box<dyn Error>> {
    // ...
    let spinner_tick_task = {
        let shared_spinner_clone = Arc::clone(&shared_spinner);
        task::spawn(async move {
            loop {
                {
                    let spinner = shared_spinner_clone.lock().await;
                    spinner.tick();
                }
                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
            }
        })
    };

    let spinner_message_task = task::spawn({
        let shared_spinner_clone = Arc::clone(&shared_spinner);
        async move {
            while let Some(message) = rx.recv().await {
                update_spinner(Arc::clone(&shared_spinner_clone), message)
                .await;
            }
        }
    });

    handle_request(query, tokens, tx).await?;

    spinner_tick_task.abort(); // Stop the spinner tick task
    spinner_message_task.abort();
    shared_spinner.lock().await.finish_and_clear();
}

With the mpsc channel in place, we can update the other code


async fn handle_request(
    query: String,
    tokens: u32,

    spinner_tx: mpsc::Sender<Cow<'static, str>>,
) -> Result<(), Box<dyn Error>> {
    // let mut success = false;
    let mut command = None;
    for i in 0..3 {
        spinner_tx
            .send(Cow::Owned(format!("Searching ({})", i + 1)))
            .await?;

        let response: ApiResponse = get_response(query.clone(), tokens).await?;

        spinner_tx.send(Cow::Borrowed("Parsing response")).await?;

        if let Ok(parsed_command) =
            serde_json::from_str::<Instructions>(&response.choices[0].message.content)
        {
            command = Some(parsed_command);
            break;
        }
    }

    match command {
        Some(command) => {
            spinner_tx.send(Cow::Borrowed("Executing command")).await?;
            handle_external_commands(&command, spinner_tx).await?;
            println!("{}", command.instruction_commands[0]);
        }
        None => {
            println!("Error in parsing the response, Please try again with a different query");
        }
    }
    Ok(())
}


pub async fn handle_external_commands(
    command: &Instructions,
    spinner_tx: mpsc::Sender<Cow<'static, str>>,
) -> Result<(), mpsc::error::SendError<Cow<'static, str>>> {
    let mut found_one = false;

    // Use a regular `for` loop with `enumerate()` instead of `for_each`
    for (index, tool) in command.external_commands.iter().enumerate() {
        spinner_tx
            .send(Cow::Owned(format!(
                "Checking for ({}) ...",
                tool.to_owned()
            )))
            .await?;

        let output = Command::new("which").arg(tool.trim()).output();
        if let Ok(output) = output {
            if !output.status.success() {
                if !found_one {
                    println!("Run the following commands to install the required tools:");
                    found_one = true;
                }
                println!(
                    "{}",
                    command
                        .external_install
                        .get(index)
                        .expect("Index out of bounds")
                );
            }
        }
    }
    spinner_tx.send(Cow::Borrowed("Done")).await?;
    Ok(())
}

Here is a visualization of what's happening:

Cover: Bhupesh

🔗 Repository: termoil