Skip to content

Commit b565604

Browse files
committed
⚡️ Replace unbounded channels with bounded channels and cancellation tokens
Switch content update channels from unbounded to bounded (capacity 100) for better backpressure handling. Replace AtomicBool task coordination with CancellationToken for cleaner async task lifecycle management. The status polling task now uses tokio::select! with the cancellation token and an interval timer, while the content update listener uses select! for zero-latency message handling instead of polling with try_recv.
1 parent 7f59388 commit b565604

File tree

4 files changed

+50
-44
lines changed

4 files changed

+50
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ thiserror = "2.0.4"
5656
tiktoken-rs = "0.6.0"
5757
tokio = { version = "1.44.2", features = ["full"] }
5858
tokio-retry = "0.3.0"
59+
tokio-util = "0.7"
5960
toml = "0.8.19"
6061
tracing = "0.1.40"
6162
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "time", "json", "chrono"] }

src/agents/tools/content_update.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ pub enum ContentUpdate {
3131
Review { content: String },
3232
}
3333

34+
/// Channel capacity for content updates
35+
pub const CONTENT_UPDATE_CHANNEL_CAPACITY: usize = 100;
36+
3437
/// Sender for content updates - passed to tools and listened to by Studio
35-
pub type ContentUpdateSender = mpsc::UnboundedSender<ContentUpdate>;
36-
pub type ContentUpdateReceiver = mpsc::UnboundedReceiver<ContentUpdate>;
38+
pub type ContentUpdateSender = mpsc::Sender<ContentUpdate>;
39+
pub type ContentUpdateReceiver = mpsc::Receiver<ContentUpdate>;
3740

38-
/// Create a new content update channel
41+
/// Create a new bounded content update channel
3942
pub fn create_content_update_channel() -> (ContentUpdateSender, ContentUpdateReceiver) {
40-
mpsc::unbounded_channel()
43+
mpsc::channel(CONTENT_UPDATE_CHANNEL_CAPACITY)
4144
}
4245

4346
// ═══════════════════════════════════════════════════════════════════════════════
@@ -98,7 +101,7 @@ impl Tool for UpdateCommitTool {
98101
message: args.message.clone().unwrap_or_default(),
99102
};
100103

101-
self.sender.send(update).map_err(|e| {
104+
self.sender.try_send(update).map_err(|e| {
102105
tracing::error!("Failed to send content update: {}", e);
103106
ContentUpdateError(format!("Failed to send update: {}", e))
104107
})?;
@@ -161,7 +164,7 @@ impl Tool for UpdatePRTool {
161164
};
162165

163166
self.sender
164-
.send(update)
167+
.try_send(update)
165168
.map_err(|e| ContentUpdateError(format!("Failed to send update: {}", e)))?;
166169

167170
let result = json!({
@@ -219,7 +222,7 @@ impl Tool for UpdateReviewTool {
219222
};
220223

221224
self.sender
222-
.send(update)
225+
.try_send(update)
223226
.map_err(|e| ContentUpdateError(format!("Failed to send update: {}", e)))?;
224227

225228
let result = json!({

src/studio/app.rs

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -743,8 +743,7 @@ impl StudioApp {
743743
use crate::agents::status::IRIS_STATUS;
744744
use crate::agents::tools::{ContentUpdate, create_content_update_channel};
745745
use crate::studio::state::{ChatMessage, ChatRole};
746-
use std::sync::Arc;
747-
use std::sync::atomic::{AtomicBool, Ordering};
746+
use tokio_util::sync::CancellationToken;
748747

749748
let Some(agent) = self.agent_service.clone() else {
750749
let tx = self.iris_result_tx.clone();
@@ -754,7 +753,7 @@ impl StudioApp {
754753
return;
755754
};
756755

757-
// Create content update channel for tool-based updates
756+
// Create bounded content update channel for tool-based updates
758757
let (content_tx, mut content_rx) = create_content_update_channel();
759758

760759
// Capture context before spawning async task
@@ -772,44 +771,50 @@ impl StudioApp {
772771
.current_content
773772
.or_else(|| self.get_current_content_for_chat());
774773

775-
// Flag to signal when the main task is done
776-
let is_done = Arc::new(AtomicBool::new(false));
777-
let is_done_clone = is_done.clone();
778-
let is_done_updates = is_done.clone();
774+
// Cancellation token to signal when the main task is done
775+
let cancel_token = CancellationToken::new();
776+
let cancel_status = cancel_token.clone();
777+
let cancel_updates = cancel_token.clone();
779778

780-
// Spawn a status polling task
779+
// Spawn a status polling task (polls global state, so still uses interval)
781780
tokio::spawn(async move {
782781
use crate::agents::status::IrisPhase;
783782
let mut last_tool: Option<String> = None;
784-
785-
while !is_done_clone.load(Ordering::Relaxed) {
786-
let status = IRIS_STATUS.get_current();
787-
788-
// Check if we're in a tool execution phase
789-
if let IrisPhase::ToolExecution {
790-
ref tool_name,
791-
ref reason,
792-
} = status.phase
793-
{
794-
// Only send if it's a new tool
795-
if last_tool.as_ref() != Some(tool_name) {
796-
let _ = tx_status.send(IrisTaskResult::ToolStatus {
797-
tool_name: tool_name.clone(),
798-
message: reason.clone(),
799-
});
800-
last_tool = Some(tool_name.clone());
783+
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
784+
785+
loop {
786+
tokio::select! {
787+
_ = cancel_status.cancelled() => break,
788+
_ = interval.tick() => {
789+
let status = IRIS_STATUS.get_current();
790+
791+
// Check if we're in a tool execution phase
792+
if let IrisPhase::ToolExecution {
793+
ref tool_name,
794+
ref reason,
795+
} = status.phase
796+
{
797+
// Only send if it's a new tool
798+
if last_tool.as_ref() != Some(tool_name) {
799+
let _ = tx_status.send(IrisTaskResult::ToolStatus {
800+
tool_name: tool_name.clone(),
801+
message: reason.clone(),
802+
});
803+
last_tool = Some(tool_name.clone());
804+
}
805+
}
801806
}
802807
}
803-
804-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
805808
}
806809
});
807810

808-
// Spawn a task to listen for content updates from tools
811+
// Spawn a task to listen for content updates from tools (uses select! for zero latency)
809812
tokio::spawn(async move {
810-
while !is_done_updates.load(Ordering::Relaxed) {
811-
match content_rx.try_recv() {
812-
Ok(update) => {
813+
loop {
814+
tokio::select! {
815+
_ = cancel_updates.cancelled() => break,
816+
update = content_rx.recv() => {
817+
let Some(update) = update else { break };
813818
let chat_update = match update {
814819
ContentUpdate::Commit {
815820
emoji,
@@ -834,10 +839,6 @@ impl StudioApp {
834839
};
835840
let _ = tx_updates.send(IrisTaskResult::ChatUpdate(chat_update));
836841
}
837-
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
838-
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
839-
}
840-
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
841842
}
842843
}
843844
});
@@ -926,8 +927,8 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
926927
}
927928
}
928929

929-
// Signal that we're done so the status polling task stops
930-
is_done.store(true, Ordering::Relaxed);
930+
// Signal that we're done so the helper tasks stop
931+
cancel_token.cancel();
931932
});
932933
}
933934

0 commit comments

Comments
 (0)