Skip to content

Commit 443ed16

Browse files
committed
🚀 Unleash real-time streaming for agent task execution in Studio
Forge a new frontier in AI interaction with buttery-smooth streaming callbacks that bring LLM output to life in the TUI as it happens. The new execute_task_streaming method on IrisAgent opens the floodgates for real-time token delivery, transforming static waits into dynamic conversations. This is how AI should feel—responsive, alive, and instantaneous.
1 parent 5a15d42 commit 443ed16

File tree

3 files changed

+234
-12
lines changed

3 files changed

+234
-12
lines changed

src/agents/iris.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,125 @@ Guidelines:
804804
}
805805
}
806806

807+
/// Execute a task with streaming, calling the callback with each text chunk
808+
///
809+
/// This enables real-time display of LLM output in the TUI.
810+
/// The callback receives `(chunk, aggregated_text)` for each delta.
811+
///
812+
/// Returns the final structured response after streaming completes.
813+
pub async fn execute_task_streaming<F>(
814+
&mut self,
815+
capability: &str,
816+
user_prompt: &str,
817+
mut on_chunk: F,
818+
) -> Result<StructuredResponse>
819+
where
820+
F: FnMut(&str, &str) + Send,
821+
{
822+
use crate::agents::status::IrisPhase;
823+
use crate::messages::get_capability_message;
824+
use futures::StreamExt;
825+
use rig::agent::MultiTurnStreamItem;
826+
use rig::streaming::{StreamedAssistantContent, StreamingPrompt};
827+
828+
// Show initializing status
829+
let waiting_msg = get_capability_message(capability);
830+
crate::iris_status_dynamic!(IrisPhase::Initializing, waiting_msg.text, 1, 4);
831+
832+
// Load the capability config
833+
let (mut system_prompt, output_type) = self.load_capability_config(capability)?;
834+
835+
// Inject style instructions
836+
self.inject_style_instructions(&mut system_prompt, capability);
837+
838+
// Set current capability
839+
self.current_capability = Some(capability.to_string());
840+
841+
// Update status
842+
crate::iris_status_dynamic!(
843+
IrisPhase::Analysis,
844+
"🔍 Iris is analyzing your changes...",
845+
2,
846+
4
847+
);
848+
849+
// Build the agent
850+
let agent = std::sync::Arc::new(self.build_agent()?);
851+
852+
// Build the full prompt (simplified for streaming - no JSON schema enforcement)
853+
let full_prompt = format!(
854+
"{}\n\n{}\n\n\
855+
After using the available tools, respond with your analysis in markdown format.\n\
856+
Keep it clear, well-structured, and informative.",
857+
system_prompt, user_prompt
858+
);
859+
860+
// Update status
861+
let gen_msg = get_capability_message(capability);
862+
crate::iris_status_dynamic!(IrisPhase::Generation, gen_msg.text, 3, 4);
863+
864+
// Use streaming prompt
865+
let mut stream = agent.stream_prompt(&full_prompt).multi_turn(50).await;
866+
867+
let mut aggregated_text = String::new();
868+
869+
// Consume the stream
870+
while let Some(item) = stream.next().await {
871+
match item {
872+
Ok(MultiTurnStreamItem::StreamAssistantItem(StreamedAssistantContent::Text(
873+
text,
874+
))) => {
875+
aggregated_text.push_str(&text.text);
876+
on_chunk(&text.text, &aggregated_text);
877+
}
878+
Ok(MultiTurnStreamItem::FinalResponse(_)) => {
879+
// Stream complete
880+
break;
881+
}
882+
Err(e) => {
883+
return Err(anyhow::anyhow!("Streaming error: {}", e));
884+
}
885+
_ => {
886+
// Tool calls, reasoning, etc. - continue
887+
}
888+
}
889+
}
890+
891+
// Update status
892+
crate::iris_status_dynamic!(
893+
IrisPhase::Synthesis,
894+
"✨ Iris is synthesizing results...",
895+
4,
896+
4
897+
);
898+
899+
// Convert the aggregated text to structured response based on output type
900+
let response = match output_type.as_str() {
901+
"MarkdownReview" => StructuredResponse::MarkdownReview(crate::types::MarkdownReview {
902+
content: aggregated_text,
903+
}),
904+
"MarkdownPullRequest" => {
905+
StructuredResponse::PullRequest(crate::types::MarkdownPullRequest {
906+
content: aggregated_text,
907+
})
908+
}
909+
"MarkdownChangelog" => StructuredResponse::Changelog(crate::types::MarkdownChangelog {
910+
content: aggregated_text,
911+
}),
912+
"MarkdownReleaseNotes" => {
913+
StructuredResponse::ReleaseNotes(crate::types::MarkdownReleaseNotes {
914+
content: aggregated_text,
915+
})
916+
}
917+
"SemanticBlame" => StructuredResponse::SemanticBlame(aggregated_text),
918+
_ => StructuredResponse::PlainText(aggregated_text),
919+
};
920+
921+
crate::iris_status_completed!();
922+
923+
Ok(response)
924+
}
925+
807926
/// Load capability configuration from embedded TOML, returning both prompt and output type
808927
fn load_capability_config(&self, capability: &str) -> Result<(String, String)> {
809928
let _ = self; // Keep &self for method syntax consistency

src/agents/setup.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,34 @@ impl IrisAgentService {
372372
agent.execute_task("chat", task_prompt).await
373373
}
374374

375+
/// Execute an agent task with streaming
376+
///
377+
/// This method streams LLM output in real-time, calling the callback with each
378+
/// text chunk as it arrives. Ideal for TUI display of generation progress.
379+
///
380+
/// # Arguments
381+
/// * `capability` - The capability to invoke (e.g., "review", "pr", "changelog")
382+
/// * `context` - Structured context describing what to analyze
383+
/// * `on_chunk` - Callback receiving `(chunk, aggregated_text)` for each delta
384+
///
385+
/// # Returns
386+
/// The final structured response after streaming completes
387+
pub async fn execute_task_streaming<F>(
388+
&self,
389+
capability: &str,
390+
context: TaskContext,
391+
on_chunk: F,
392+
) -> Result<StructuredResponse>
393+
where
394+
F: FnMut(&str, &str) + Send,
395+
{
396+
let mut agent = self.create_agent()?;
397+
let task_prompt = Self::build_task_prompt(capability, &context);
398+
agent
399+
.execute_task_streaming(capability, &task_prompt, on_chunk)
400+
.await
401+
}
402+
375403
/// Get the configuration
376404
pub fn config(&self) -> &Config {
377405
&self.config

src/studio/app.rs

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -940,7 +940,7 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
940940
}
941941
}
942942

943-
/// Spawn a task for code review generation
943+
/// Spawn a task for code review generation with streaming
944944
fn spawn_review_generation(&self, from_ref: String, to_ref: String) {
945945
use crate::agents::{StructuredResponse, TaskContext};
946946

@@ -953,6 +953,7 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
953953
};
954954

955955
let tx = self.iris_result_tx.clone();
956+
let streaming_tx = tx.clone();
956957

957958
tokio::spawn(async move {
958959
// Use review context with specified refs
@@ -964,9 +965,29 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
964965
}
965966
};
966967

967-
// Execute the review capability
968-
match agent.execute_task("review", context).await {
968+
// Execute with streaming - send chunks as they arrive
969+
let on_chunk = {
970+
let tx = streaming_tx.clone();
971+
move |chunk: &str, aggregated: &str| {
972+
let _ = tx.send(IrisTaskResult::StreamingChunk {
973+
task_type: TaskType::Review,
974+
chunk: chunk.to_string(),
975+
aggregated: aggregated.to_string(),
976+
});
977+
}
978+
};
979+
980+
match agent
981+
.execute_task_streaming("review", context, on_chunk)
982+
.await
983+
{
969984
Ok(response) => {
985+
// Send streaming complete
986+
let _ = tx.send(IrisTaskResult::StreamingComplete {
987+
task_type: TaskType::Review,
988+
});
989+
990+
// Also send final content
970991
let review_text = match response {
971992
StructuredResponse::MarkdownReview(review) => review.content,
972993
StructuredResponse::PlainText(text) => text,
@@ -981,7 +1002,7 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
9811002
});
9821003
}
9831004

984-
/// Spawn a task for PR description generation
1005+
/// Spawn a task for PR description generation with streaming
9851006
fn spawn_pr_generation(&self, base_branch: String, _to_ref: String) {
9861007
use crate::agents::{StructuredResponse, TaskContext};
9871008

@@ -994,14 +1015,30 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
9941015
};
9951016

9961017
let tx = self.iris_result_tx.clone();
1018+
let streaming_tx = tx.clone();
9971019

9981020
tokio::spawn(async move {
9991021
// Build context for PR (comparing current branch to base)
10001022
let context = TaskContext::for_pr(Some(base_branch), None);
10011023

1002-
// Execute the PR capability
1003-
match agent.execute_task("pr", context).await {
1024+
// Execute with streaming
1025+
let on_chunk = {
1026+
let tx = streaming_tx.clone();
1027+
move |chunk: &str, aggregated: &str| {
1028+
let _ = tx.send(IrisTaskResult::StreamingChunk {
1029+
task_type: TaskType::PR,
1030+
chunk: chunk.to_string(),
1031+
aggregated: aggregated.to_string(),
1032+
});
1033+
}
1034+
};
1035+
1036+
match agent.execute_task_streaming("pr", context, on_chunk).await {
10041037
Ok(response) => {
1038+
let _ = tx.send(IrisTaskResult::StreamingComplete {
1039+
task_type: TaskType::PR,
1040+
});
1041+
10051042
let pr_text = match response {
10061043
StructuredResponse::PullRequest(pr) => pr.content,
10071044
StructuredResponse::PlainText(text) => text,
@@ -1016,7 +1053,7 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
10161053
});
10171054
}
10181055

1019-
/// Spawn a task for changelog generation
1056+
/// Spawn a task for changelog generation with streaming
10201057
fn spawn_changelog_generation(&self, from_ref: String, to_ref: String) {
10211058
use crate::agents::{StructuredResponse, TaskContext};
10221059

@@ -1029,14 +1066,33 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
10291066
};
10301067

10311068
let tx = self.iris_result_tx.clone();
1069+
let streaming_tx = tx.clone();
10321070

10331071
tokio::spawn(async move {
10341072
// Build context for changelog (comparing two refs)
10351073
let context = TaskContext::for_changelog(from_ref, Some(to_ref));
10361074

1037-
// Execute the changelog capability
1038-
match agent.execute_task("changelog", context).await {
1075+
// Execute with streaming
1076+
let on_chunk = {
1077+
let tx = streaming_tx.clone();
1078+
move |chunk: &str, aggregated: &str| {
1079+
let _ = tx.send(IrisTaskResult::StreamingChunk {
1080+
task_type: TaskType::Changelog,
1081+
chunk: chunk.to_string(),
1082+
aggregated: aggregated.to_string(),
1083+
});
1084+
}
1085+
};
1086+
1087+
match agent
1088+
.execute_task_streaming("changelog", context, on_chunk)
1089+
.await
1090+
{
10391091
Ok(response) => {
1092+
let _ = tx.send(IrisTaskResult::StreamingComplete {
1093+
task_type: TaskType::Changelog,
1094+
});
1095+
10401096
let changelog_text = match response {
10411097
StructuredResponse::Changelog(cl) => cl.content,
10421098
StructuredResponse::PlainText(text) => text,
@@ -1051,7 +1107,7 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
10511107
});
10521108
}
10531109

1054-
/// Spawn a task for release notes generation
1110+
/// Spawn a task for release notes generation with streaming
10551111
fn spawn_release_notes_generation(&self, from_ref: String, to_ref: String) {
10561112
use crate::agents::{StructuredResponse, TaskContext};
10571113

@@ -1064,14 +1120,33 @@ Simply call the appropriate tool with the new content. Do NOT echo back the full
10641120
};
10651121

10661122
let tx = self.iris_result_tx.clone();
1123+
let streaming_tx = tx.clone();
10671124

10681125
tokio::spawn(async move {
10691126
// Build context for release notes (comparing two refs)
10701127
let context = TaskContext::for_changelog(from_ref, Some(to_ref));
10711128

1072-
// Execute the release_notes capability
1073-
match agent.execute_task("release_notes", context).await {
1129+
// Execute with streaming
1130+
let on_chunk = {
1131+
let tx = streaming_tx.clone();
1132+
move |chunk: &str, aggregated: &str| {
1133+
let _ = tx.send(IrisTaskResult::StreamingChunk {
1134+
task_type: TaskType::ReleaseNotes,
1135+
chunk: chunk.to_string(),
1136+
aggregated: aggregated.to_string(),
1137+
});
1138+
}
1139+
};
1140+
1141+
match agent
1142+
.execute_task_streaming("release_notes", context, on_chunk)
1143+
.await
1144+
{
10741145
Ok(response) => {
1146+
let _ = tx.send(IrisTaskResult::StreamingComplete {
1147+
task_type: TaskType::ReleaseNotes,
1148+
});
1149+
10751150
let release_notes_text = match response {
10761151
StructuredResponse::ReleaseNotes(rn) => rn.content,
10771152
StructuredResponse::PlainText(text) => text,

0 commit comments

Comments
 (0)