Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35/// Wrapper that displays only the tokens of a parsed expr.
36///
37/// Boxes `syn::Type` which is ~240 bytes.
38#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42    fn from(expr: syn::Expr) -> Self {
43        Self(Box::new(expr))
44    }
45}
46
47impl Deref for DebugExpr {
48    type Target = syn::Expr;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl ToTokens for DebugExpr {
56    fn to_tokens(&self, tokens: &mut TokenStream) {
57        self.0.to_tokens(tokens);
58    }
59}
60
61impl Debug for DebugExpr {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.0.to_token_stream())
64    }
65}
66
67impl Display for DebugExpr {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let original = self.0.as_ref().clone();
70        let simplified = simplify_q_macro(original);
71
72        // For now, just use quote formatting without trying to parse as a statement
73        // This avoids the syn::parse_quote! issues entirely
74        write!(f, "q!({})", quote::quote!(#simplified))
75    }
76}
77
78/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
79fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80    // Try to parse the token string as a syn::Expr
81    // Use a visitor to simplify q! macro expansions
82    let mut simplifier = QMacroSimplifier::new();
83    simplifier.visit_expr_mut(&mut expr);
84
85    // If we found and simplified a q! macro, return the simplified version
86    if let Some(simplified) = simplifier.simplified_result {
87        simplified
88    } else {
89        expr
90    }
91}
92
93/// AST visitor that simplifies q! macro expansions
94#[derive(Default)]
95pub struct QMacroSimplifier {
96    pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105impl VisitMut for QMacroSimplifier {
106    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107        // Check if we already found a result to avoid further processing
108        if self.simplified_result.is_some() {
109            return;
110        }
111
112        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113            // Look for calls to stageleft::runtime_support::fn*
114            && self.is_stageleft_runtime_support_call(&path_expr.path)
115            // Try to extract the closure from the arguments
116            && let Some(closure) = self.extract_closure_from_args(&call.args)
117        {
118            self.simplified_result = Some(closure);
119            return;
120        }
121
122        // Continue visiting child expressions using the default implementation
123        // Use the default visitor to avoid infinite recursion
124        syn::visit_mut::visit_expr_mut(self, expr);
125    }
126}
127
128impl QMacroSimplifier {
129    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130        // Check if this is a call to stageleft::runtime_support::fn*
131        if let Some(last_segment) = path.segments.last() {
132            let fn_name = last_segment.ident.to_string();
133            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
134            fn_name.contains("_type_hint")
135                && path.segments.len() > 2
136                && path.segments[0].ident == "stageleft"
137                && path.segments[1].ident == "runtime_support"
138        } else {
139            false
140        }
141    }
142
143    fn extract_closure_from_args(
144        &self,
145        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146    ) -> Option<syn::Expr> {
147        // Look through the arguments for a closure expression
148        for arg in args {
149            if let syn::Expr::Closure(_) = arg {
150                return Some(arg.clone());
151            }
152            // Also check for closures nested in other expressions (like blocks)
153            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154                return Some(closure_expr);
155            }
156        }
157        None
158    }
159
160    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161        let mut visitor = ClosureFinder {
162            found_closure: None,
163            prefer_inner_blocks: true,
164        };
165        visitor.visit_expr(expr);
166        visitor.found_closure
167    }
168}
169
170/// Visitor that finds closures in expressions with special block handling
171struct ClosureFinder {
172    found_closure: Option<syn::Expr>,
173    prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178        // If we already found a closure, don't continue searching
179        if self.found_closure.is_some() {
180            return;
181        }
182
183        match expr {
184            syn::Expr::Closure(_) => {
185                self.found_closure = Some(expr.clone());
186            }
187            syn::Expr::Block(block) if self.prefer_inner_blocks => {
188                // Special handling for blocks - look for inner blocks that contain closures
189                for stmt in &block.block.stmts {
190                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
191                        && let syn::Expr::Block(_) = stmt_expr
192                    {
193                        // Check if this nested block contains a closure
194                        let mut inner_visitor = ClosureFinder {
195                            found_closure: None,
196                            prefer_inner_blocks: false, // Avoid infinite recursion
197                        };
198                        inner_visitor.visit_expr(stmt_expr);
199                        if inner_visitor.found_closure.is_some() {
200                            // Found a closure in an inner block, return that block
201                            self.found_closure = Some(stmt_expr.clone());
202                            return;
203                        }
204                    }
205                }
206
207                // If no inner block with closure found, continue with normal visitation
208                visit::visit_expr(self, expr);
209
210                // If we found a closure, just return the closure itself, not the whole block
211                // unless we're in the special case where we want the containing block
212                if self.found_closure.is_some() {
213                    // The closure was found during visitation, no need to wrap in block
214                }
215            }
216            _ => {
217                // Use default visitor behavior for all other expressions
218                visit::visit_expr(self, expr);
219            }
220        }
221    }
222}
223
224/// Debug displays the type's tokens.
225///
226/// Boxes `syn::Type` which is ~320 bytes.
227#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231    fn from(t: syn::Type) -> Self {
232        Self(Box::new(t))
233    }
234}
235
236impl Deref for DebugType {
237    type Target = syn::Type;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0
241    }
242}
243
244impl ToTokens for DebugType {
245    fn to_tokens(&self, tokens: &mut TokenStream) {
246        self.0.to_tokens(tokens);
247    }
248}
249
250impl Debug for DebugType {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.to_token_stream())
253    }
254}
255
256pub enum DebugInstantiate {
257    Building,
258    Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262    not(feature = "build"),
263    expect(
264        dead_code,
265        reason = "sink, source unused without `feature = \"build\"`."
266    )
267)]
268pub struct DebugInstantiateFinalized {
269    sink: syn::Expr,
270    source: syn::Expr,
271    connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275    fn from(f: DebugInstantiateFinalized) -> Self {
276        Self::Finalized(Box::new(f))
277    }
278}
279
280impl Debug for DebugInstantiate {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        write!(f, "<network instantiate>")
283    }
284}
285
286impl Hash for DebugInstantiate {
287    fn hash<H: Hasher>(&self, _state: &mut H) {
288        // Do nothing
289    }
290}
291
292impl Clone for DebugInstantiate {
293    fn clone(&self) -> Self {
294        match self {
295            DebugInstantiate::Building => DebugInstantiate::Building,
296            DebugInstantiate::Finalized(_) => {
297                panic!("DebugInstantiate::Finalized should not be cloned")
298            }
299        }
300    }
301}
302
303/// Tracks the instantiation state of a `ClusterMembers` source.
304///
305/// During `compile_network`, the first `ClusterMembers` node for a given
306/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
307/// receives the expression returned by `Deploy::cluster_membership_stream`.
308/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
309/// during code-gen they simply reference the tee output of the first node
310/// instead of creating a redundant `source_stream`.
311#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313    /// Not yet instantiated.
314    Uninit,
315    /// The primary instance: holds the stream expression and will emit
316    /// `source_stream(expr) -> tee()` during code-gen.
317    Stream(DebugExpr),
318    /// A secondary instance that references the tee output of the primary.
319    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
320    /// can derive the deterministic tee ident without extra state.
321    Tee(LocationId, LocationId),
322}
323
324/// A source in a Hydro graph, where data enters the graph.
325#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327    Stream(DebugExpr),
328    ExternalNetwork(),
329    Iter(DebugExpr),
330    Spin(),
331    ClusterMembers(LocationId, ClusterMembersState),
332    Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
337/// and simulations.
338///
339/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
340/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
341pub trait DfirBuilder {
342    /// Whether the representation of singletons should include intermediate states.
343    fn singleton_intermediates(&self) -> bool;
344
345    /// Gets the DFIR builder for the given location, creating it if necessary.
346    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348    fn batch(
349        &mut self,
350        in_ident: syn::Ident,
351        in_location: &LocationId,
352        in_kind: &CollectionKind,
353        out_ident: &syn::Ident,
354        out_location: &LocationId,
355        op_meta: &HydroIrOpMetadata,
356    );
357    fn yield_from_tick(
358        &mut self,
359        in_ident: syn::Ident,
360        in_location: &LocationId,
361        in_kind: &CollectionKind,
362        out_ident: &syn::Ident,
363        out_location: &LocationId,
364    );
365
366    fn begin_atomic(
367        &mut self,
368        in_ident: syn::Ident,
369        in_location: &LocationId,
370        in_kind: &CollectionKind,
371        out_ident: &syn::Ident,
372        out_location: &LocationId,
373        op_meta: &HydroIrOpMetadata,
374    );
375    fn end_atomic(
376        &mut self,
377        in_ident: syn::Ident,
378        in_location: &LocationId,
379        in_kind: &CollectionKind,
380        out_ident: &syn::Ident,
381    );
382
383    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384    fn observe_nondet(
385        &mut self,
386        trusted: bool,
387        location: &LocationId,
388        in_ident: syn::Ident,
389        in_kind: &CollectionKind,
390        out_ident: &syn::Ident,
391        out_kind: &CollectionKind,
392        op_meta: &HydroIrOpMetadata,
393    );
394
395    #[expect(clippy::too_many_arguments, reason = "TODO")]
396    fn create_network(
397        &mut self,
398        from: &LocationId,
399        to: &LocationId,
400        input_ident: syn::Ident,
401        out_ident: &syn::Ident,
402        serialize: Option<&DebugExpr>,
403        sink: syn::Expr,
404        source: syn::Expr,
405        deserialize: Option<&DebugExpr>,
406        tag_id: usize,
407    );
408
409    fn create_external_source(
410        &mut self,
411        on: &LocationId,
412        source_expr: syn::Expr,
413        out_ident: &syn::Ident,
414        deserialize: Option<&DebugExpr>,
415        tag_id: usize,
416    );
417
418    fn create_external_output(
419        &mut self,
420        on: &LocationId,
421        sink_expr: syn::Expr,
422        input_ident: &syn::Ident,
423        serialize: Option<&DebugExpr>,
424        tag_id: usize,
425    );
426}
427
428#[cfg(feature = "build")]
429impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
430    fn singleton_intermediates(&self) -> bool {
431        false
432    }
433
434    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
435        self.entry(location.root().key())
436            .expect("location was removed")
437            .or_default()
438    }
439
440    fn batch(
441        &mut self,
442        in_ident: syn::Ident,
443        in_location: &LocationId,
444        in_kind: &CollectionKind,
445        out_ident: &syn::Ident,
446        _out_location: &LocationId,
447        _op_meta: &HydroIrOpMetadata,
448    ) {
449        let builder = self.get_dfir_mut(in_location.root());
450        if in_kind.is_bounded()
451            && matches!(
452                in_kind,
453                CollectionKind::Singleton { .. }
454                    | CollectionKind::Optional { .. }
455                    | CollectionKind::KeyedSingleton { .. }
456            )
457        {
458            assert!(in_location.is_top_level());
459            builder.add_dfir(
460                parse_quote! {
461                    #out_ident = #in_ident -> persist::<'static>();
462                },
463                None,
464                None,
465            );
466        } else {
467            builder.add_dfir(
468                parse_quote! {
469                    #out_ident = #in_ident;
470                },
471                None,
472                None,
473            );
474        }
475    }
476
477    fn yield_from_tick(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        _in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        _out_location: &LocationId,
484    ) {
485        let builder = self.get_dfir_mut(in_location.root());
486        builder.add_dfir(
487            parse_quote! {
488                #out_ident = #in_ident;
489            },
490            None,
491            None,
492        );
493    }
494
495    fn begin_atomic(
496        &mut self,
497        in_ident: syn::Ident,
498        in_location: &LocationId,
499        _in_kind: &CollectionKind,
500        out_ident: &syn::Ident,
501        _out_location: &LocationId,
502        _op_meta: &HydroIrOpMetadata,
503    ) {
504        let builder = self.get_dfir_mut(in_location.root());
505        builder.add_dfir(
506            parse_quote! {
507                #out_ident = #in_ident;
508            },
509            None,
510            None,
511        );
512    }
513
514    fn end_atomic(
515        &mut self,
516        in_ident: syn::Ident,
517        in_location: &LocationId,
518        _in_kind: &CollectionKind,
519        out_ident: &syn::Ident,
520    ) {
521        let builder = self.get_dfir_mut(in_location.root());
522        builder.add_dfir(
523            parse_quote! {
524                #out_ident = #in_ident;
525            },
526            None,
527            None,
528        );
529    }
530
531    fn observe_nondet(
532        &mut self,
533        _trusted: bool,
534        location: &LocationId,
535        in_ident: syn::Ident,
536        _in_kind: &CollectionKind,
537        out_ident: &syn::Ident,
538        _out_kind: &CollectionKind,
539        _op_meta: &HydroIrOpMetadata,
540    ) {
541        let builder = self.get_dfir_mut(location);
542        builder.add_dfir(
543            parse_quote! {
544                #out_ident = #in_ident;
545            },
546            None,
547            None,
548        );
549    }
550
551    fn create_network(
552        &mut self,
553        from: &LocationId,
554        to: &LocationId,
555        input_ident: syn::Ident,
556        out_ident: &syn::Ident,
557        serialize: Option<&DebugExpr>,
558        sink: syn::Expr,
559        source: syn::Expr,
560        deserialize: Option<&DebugExpr>,
561        tag_id: usize,
562    ) {
563        let sender_builder = self.get_dfir_mut(from);
564        if let Some(serialize_pipeline) = serialize {
565            sender_builder.add_dfir(
566                parse_quote! {
567                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
568                },
569                None,
570                // operator tag separates send and receive, which otherwise have the same next_stmt_id
571                Some(&format!("send{}", tag_id)),
572            );
573        } else {
574            sender_builder.add_dfir(
575                parse_quote! {
576                    #input_ident -> dest_sink(#sink);
577                },
578                None,
579                Some(&format!("send{}", tag_id)),
580            );
581        }
582
583        let receiver_builder = self.get_dfir_mut(to);
584        if let Some(deserialize_pipeline) = deserialize {
585            receiver_builder.add_dfir(
586                parse_quote! {
587                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
588                },
589                None,
590                Some(&format!("recv{}", tag_id)),
591            );
592        } else {
593            receiver_builder.add_dfir(
594                parse_quote! {
595                    #out_ident = source_stream(#source);
596                },
597                None,
598                Some(&format!("recv{}", tag_id)),
599            );
600        }
601    }
602
603    fn create_external_source(
604        &mut self,
605        on: &LocationId,
606        source_expr: syn::Expr,
607        out_ident: &syn::Ident,
608        deserialize: Option<&DebugExpr>,
609        tag_id: usize,
610    ) {
611        let receiver_builder = self.get_dfir_mut(on);
612        if let Some(deserialize_pipeline) = deserialize {
613            receiver_builder.add_dfir(
614                parse_quote! {
615                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
616                },
617                None,
618                Some(&format!("recv{}", tag_id)),
619            );
620        } else {
621            receiver_builder.add_dfir(
622                parse_quote! {
623                    #out_ident = source_stream(#source_expr);
624                },
625                None,
626                Some(&format!("recv{}", tag_id)),
627            );
628        }
629    }
630
631    fn create_external_output(
632        &mut self,
633        on: &LocationId,
634        sink_expr: syn::Expr,
635        input_ident: &syn::Ident,
636        serialize: Option<&DebugExpr>,
637        tag_id: usize,
638    ) {
639        let sender_builder = self.get_dfir_mut(on);
640        if let Some(serialize_fn) = serialize {
641            sender_builder.add_dfir(
642                parse_quote! {
643                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
644                },
645                None,
646                // operator tag separates send and receive, which otherwise have the same next_stmt_id
647                Some(&format!("send{}", tag_id)),
648            );
649        } else {
650            sender_builder.add_dfir(
651                parse_quote! {
652                    #input_ident -> dest_sink(#sink_expr);
653                },
654                None,
655                Some(&format!("send{}", tag_id)),
656            );
657        }
658    }
659}
660
661#[cfg(feature = "build")]
662pub enum BuildersOrCallback<'a, L, N>
663where
664    L: FnMut(&mut HydroRoot, &mut usize),
665    N: FnMut(&mut HydroNode, &mut usize),
666{
667    Builders(&'a mut dyn DfirBuilder),
668    Callback(L, N),
669}
670
671/// An root in a Hydro graph, which is an pipeline that doesn't emit
672/// any downstream values. Traversals over the dataflow graph and
673/// generating DFIR IR start from roots.
674#[derive(Debug, Hash)]
675pub enum HydroRoot {
676    ForEach {
677        f: DebugExpr,
678        input: Box<HydroNode>,
679        op_metadata: HydroIrOpMetadata,
680    },
681    SendExternal {
682        to_external_key: LocationKey,
683        to_port_id: ExternalPortId,
684        to_many: bool,
685        unpaired: bool,
686        serialize_fn: Option<DebugExpr>,
687        instantiate_fn: DebugInstantiate,
688        input: Box<HydroNode>,
689        op_metadata: HydroIrOpMetadata,
690    },
691    DestSink {
692        sink: DebugExpr,
693        input: Box<HydroNode>,
694        op_metadata: HydroIrOpMetadata,
695    },
696    CycleSink {
697        cycle_id: CycleId,
698        input: Box<HydroNode>,
699        op_metadata: HydroIrOpMetadata,
700    },
701    EmbeddedOutput {
702        ident: syn::Ident,
703        input: Box<HydroNode>,
704        op_metadata: HydroIrOpMetadata,
705    },
706}
707
708impl HydroRoot {
709    #[cfg(feature = "build")]
710    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
711    pub fn compile_network<'a, D>(
712        &mut self,
713        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
714        seen_tees: &mut SeenTees,
715        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
716        processes: &SparseSecondaryMap<LocationKey, D::Process>,
717        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
718        externals: &SparseSecondaryMap<LocationKey, D::External>,
719        env: &mut D::InstantiateEnv,
720    ) where
721        D: Deploy<'a>,
722    {
723        let refcell_extra_stmts = RefCell::new(extra_stmts);
724        let refcell_env = RefCell::new(env);
725        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
726        self.transform_bottom_up(
727            &mut |l| {
728                if let HydroRoot::SendExternal {
729                    input,
730                    to_external_key,
731                    to_port_id,
732                    to_many,
733                    unpaired,
734                    instantiate_fn,
735                    ..
736                } = l
737                {
738                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
739                        DebugInstantiate::Building => {
740                            let to_node = externals
741                                .get(*to_external_key)
742                                .unwrap_or_else(|| {
743                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
744                                })
745                                .clone();
746
747                            match input.metadata().location_id.root() {
748                                &LocationId::Process(process_key) => {
749                                    if *to_many {
750                                        (
751                                            (
752                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
753                                                parse_quote!(DUMMY),
754                                            ),
755                                            Box::new(|| {}) as Box<dyn FnOnce()>,
756                                        )
757                                    } else {
758                                        let from_node = processes
759                                            .get(process_key)
760                                            .unwrap_or_else(|| {
761                                                panic!("A process used in the graph was not instantiated: {}", process_key)
762                                            })
763                                            .clone();
764
765                                        let sink_port = from_node.next_port();
766                                        let source_port = to_node.next_port();
767
768                                        if *unpaired {
769                                            use stageleft::quote_type;
770                                            use tokio_util::codec::LengthDelimitedCodec;
771
772                                            to_node.register(*to_port_id, source_port.clone());
773
774                                            let _ = D::e2o_source(
775                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
776                                                &to_node, &source_port,
777                                                &from_node, &sink_port,
778                                                &quote_type::<LengthDelimitedCodec>(),
779                                                format!("{}_{}", *to_external_key, *to_port_id)
780                                            );
781                                        }
782
783                                        (
784                                            (
785                                                D::o2e_sink(
786                                                    &from_node,
787                                                    &sink_port,
788                                                    &to_node,
789                                                    &source_port,
790                                                    format!("{}_{}", *to_external_key, *to_port_id)
791                                                ),
792                                                parse_quote!(DUMMY),
793                                            ),
794                                            if *unpaired {
795                                                D::e2o_connect(
796                                                    &to_node,
797                                                    &source_port,
798                                                    &from_node,
799                                                    &sink_port,
800                                                    *to_many,
801                                                    NetworkHint::Auto,
802                                                )
803                                            } else {
804                                                Box::new(|| {}) as Box<dyn FnOnce()>
805                                            },
806                                        )
807                                    }
808                                }
809                                LocationId::Cluster(_) => todo!(),
810                                _ => panic!()
811                            }
812                        },
813
814                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
815                    };
816
817                    *instantiate_fn = DebugInstantiateFinalized {
818                        sink: sink_expr,
819                        source: source_expr,
820                        connect_fn: Some(connect_fn),
821                    }
822                    .into();
823                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
824                    let element_type = match &input.metadata().collection_kind {
825                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
826                        _ => panic!("Embedded output must have Stream collection kind"),
827                    };
828                    let location_key = match input.metadata().location_id.root() {
829                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
830                        _ => panic!("Embedded output must be on a process or cluster"),
831                    };
832                    D::register_embedded_output(
833                        &mut refcell_env.borrow_mut(),
834                        location_key,
835                        ident,
836                        &element_type,
837                    );
838                }
839            },
840            &mut |n| {
841                if let HydroNode::Network {
842                    name,
843                    input,
844                    instantiate_fn,
845                    metadata,
846                    ..
847                } = n
848                {
849                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
850                        DebugInstantiate::Building => instantiate_network::<D>(
851                            &mut refcell_env.borrow_mut(),
852                            input.metadata().location_id.root(),
853                            metadata.location_id.root(),
854                            processes,
855                            clusters,
856                            name.as_deref(),
857                        ),
858
859                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
860                    };
861
862                    *instantiate_fn = DebugInstantiateFinalized {
863                        sink: sink_expr,
864                        source: source_expr,
865                        connect_fn: Some(connect_fn),
866                    }
867                    .into();
868                } else if let HydroNode::ExternalInput {
869                    from_external_key,
870                    from_port_id,
871                    from_many,
872                    codec_type,
873                    port_hint,
874                    instantiate_fn,
875                    metadata,
876                    ..
877                } = n
878                {
879                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
880                        DebugInstantiate::Building => {
881                            let from_node = externals
882                                .get(*from_external_key)
883                                .unwrap_or_else(|| {
884                                    panic!(
885                                        "A external used in the graph was not instantiated: {}",
886                                        from_external_key,
887                                    )
888                                })
889                                .clone();
890
891                            match metadata.location_id.root() {
892                                &LocationId::Process(process_key) => {
893                                    let to_node = processes
894                                        .get(process_key)
895                                        .unwrap_or_else(|| {
896                                            panic!("A process used in the graph was not instantiated: {}", process_key)
897                                        })
898                                        .clone();
899
900                                    let sink_port = from_node.next_port();
901                                    let source_port = to_node.next_port();
902
903                                    from_node.register(*from_port_id, sink_port.clone());
904
905                                    (
906                                        (
907                                            parse_quote!(DUMMY),
908                                            if *from_many {
909                                                D::e2o_many_source(
910                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
911                                                    &to_node, &source_port,
912                                                    codec_type.0.as_ref(),
913                                                    format!("{}_{}", *from_external_key, *from_port_id)
914                                                )
915                                            } else {
916                                                D::e2o_source(
917                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
918                                                    &from_node, &sink_port,
919                                                    &to_node, &source_port,
920                                                    codec_type.0.as_ref(),
921                                                    format!("{}_{}", *from_external_key, *from_port_id)
922                                                )
923                                            },
924                                        ),
925                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
926                                    )
927                                }
928                                LocationId::Cluster(_) => todo!(),
929                                _ => panic!()
930                            }
931                        },
932
933                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
934                    };
935
936                    *instantiate_fn = DebugInstantiateFinalized {
937                        sink: sink_expr,
938                        source: source_expr,
939                        connect_fn: Some(connect_fn),
940                    }
941                    .into();
942                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
943                    let element_type = match &metadata.collection_kind {
944                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
945                        _ => panic!("Embedded source must have Stream collection kind"),
946                    };
947                    let location_key = match metadata.location_id.root() {
948                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
949                        _ => panic!("Embedded source must be on a process or cluster"),
950                    };
951                    D::register_embedded_input(
952                        &mut refcell_env.borrow_mut(),
953                        location_key,
954                        ident,
955                        &element_type,
956                    );
957                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
958                    match state {
959                        ClusterMembersState::Uninit => {
960                            let at_location = metadata.location_id.root().clone();
961                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
962                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
963                                // First occurrence: call cluster_membership_stream and mark as Stream.
964                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
965                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
966                                    &(),
967                                );
968                                *state = ClusterMembersState::Stream(expr.into());
969                            } else {
970                                // Already instantiated for this (at, target) pair: just tee.
971                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
972                            }
973                        }
974                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
975                            panic!("cluster members already finalized");
976                        }
977                    }
978                }
979            },
980            seen_tees,
981            false,
982        );
983    }
984
985    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
986        self.transform_bottom_up(
987            &mut |l| {
988                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
989                    match instantiate_fn {
990                        DebugInstantiate::Building => panic!("network not built"),
991
992                        DebugInstantiate::Finalized(finalized) => {
993                            (finalized.connect_fn.take().unwrap())();
994                        }
995                    }
996                }
997            },
998            &mut |n| {
999                if let HydroNode::Network { instantiate_fn, .. }
1000                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1001                {
1002                    match instantiate_fn {
1003                        DebugInstantiate::Building => panic!("network not built"),
1004
1005                        DebugInstantiate::Finalized(finalized) => {
1006                            (finalized.connect_fn.take().unwrap())();
1007                        }
1008                    }
1009                }
1010            },
1011            seen_tees,
1012            false,
1013        );
1014    }
1015
1016    pub fn transform_bottom_up(
1017        &mut self,
1018        transform_root: &mut impl FnMut(&mut HydroRoot),
1019        transform_node: &mut impl FnMut(&mut HydroNode),
1020        seen_tees: &mut SeenTees,
1021        check_well_formed: bool,
1022    ) {
1023        self.transform_children(
1024            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1025            seen_tees,
1026        );
1027
1028        transform_root(self);
1029    }
1030
1031    pub fn transform_children(
1032        &mut self,
1033        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1034        seen_tees: &mut SeenTees,
1035    ) {
1036        match self {
1037            HydroRoot::ForEach { input, .. }
1038            | HydroRoot::SendExternal { input, .. }
1039            | HydroRoot::DestSink { input, .. }
1040            | HydroRoot::CycleSink { input, .. }
1041            | HydroRoot::EmbeddedOutput { input, .. } => {
1042                transform(input, seen_tees);
1043            }
1044        }
1045    }
1046
1047    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1048        match self {
1049            HydroRoot::ForEach {
1050                f,
1051                input,
1052                op_metadata,
1053            } => HydroRoot::ForEach {
1054                f: f.clone(),
1055                input: Box::new(input.deep_clone(seen_tees)),
1056                op_metadata: op_metadata.clone(),
1057            },
1058            HydroRoot::SendExternal {
1059                to_external_key,
1060                to_port_id,
1061                to_many,
1062                unpaired,
1063                serialize_fn,
1064                instantiate_fn,
1065                input,
1066                op_metadata,
1067            } => HydroRoot::SendExternal {
1068                to_external_key: *to_external_key,
1069                to_port_id: *to_port_id,
1070                to_many: *to_many,
1071                unpaired: *unpaired,
1072                serialize_fn: serialize_fn.clone(),
1073                instantiate_fn: instantiate_fn.clone(),
1074                input: Box::new(input.deep_clone(seen_tees)),
1075                op_metadata: op_metadata.clone(),
1076            },
1077            HydroRoot::DestSink {
1078                sink,
1079                input,
1080                op_metadata,
1081            } => HydroRoot::DestSink {
1082                sink: sink.clone(),
1083                input: Box::new(input.deep_clone(seen_tees)),
1084                op_metadata: op_metadata.clone(),
1085            },
1086            HydroRoot::CycleSink {
1087                cycle_id,
1088                input,
1089                op_metadata,
1090            } => HydroRoot::CycleSink {
1091                cycle_id: *cycle_id,
1092                input: Box::new(input.deep_clone(seen_tees)),
1093                op_metadata: op_metadata.clone(),
1094            },
1095            HydroRoot::EmbeddedOutput {
1096                ident,
1097                input,
1098                op_metadata,
1099            } => HydroRoot::EmbeddedOutput {
1100                ident: ident.clone(),
1101                input: Box::new(input.deep_clone(seen_tees)),
1102                op_metadata: op_metadata.clone(),
1103            },
1104        }
1105    }
1106
1107    #[cfg(feature = "build")]
1108    pub fn emit(
1109        &mut self,
1110        graph_builders: &mut dyn DfirBuilder,
1111        seen_tees: &mut SeenTees,
1112        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1113        next_stmt_id: &mut usize,
1114    ) {
1115        self.emit_core(
1116            &mut BuildersOrCallback::<
1117                fn(&mut HydroRoot, &mut usize),
1118                fn(&mut HydroNode, &mut usize),
1119            >::Builders(graph_builders),
1120            seen_tees,
1121            built_tees,
1122            next_stmt_id,
1123        );
1124    }
1125
1126    #[cfg(feature = "build")]
1127    pub fn emit_core(
1128        &mut self,
1129        builders_or_callback: &mut BuildersOrCallback<
1130            impl FnMut(&mut HydroRoot, &mut usize),
1131            impl FnMut(&mut HydroNode, &mut usize),
1132        >,
1133        seen_tees: &mut SeenTees,
1134        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1135        next_stmt_id: &mut usize,
1136    ) {
1137        match self {
1138            HydroRoot::ForEach { f, input, .. } => {
1139                let input_ident =
1140                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1141
1142                match builders_or_callback {
1143                    BuildersOrCallback::Builders(graph_builders) => {
1144                        graph_builders
1145                            .get_dfir_mut(&input.metadata().location_id)
1146                            .add_dfir(
1147                                parse_quote! {
1148                                    #input_ident -> for_each(#f);
1149                                },
1150                                None,
1151                                Some(&next_stmt_id.to_string()),
1152                            );
1153                    }
1154                    BuildersOrCallback::Callback(leaf_callback, _) => {
1155                        leaf_callback(self, next_stmt_id);
1156                    }
1157                }
1158
1159                *next_stmt_id += 1;
1160            }
1161
1162            HydroRoot::SendExternal {
1163                serialize_fn,
1164                instantiate_fn,
1165                input,
1166                ..
1167            } => {
1168                let input_ident =
1169                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1170
1171                match builders_or_callback {
1172                    BuildersOrCallback::Builders(graph_builders) => {
1173                        let (sink_expr, _) = match instantiate_fn {
1174                            DebugInstantiate::Building => (
1175                                syn::parse_quote!(DUMMY_SINK),
1176                                syn::parse_quote!(DUMMY_SOURCE),
1177                            ),
1178
1179                            DebugInstantiate::Finalized(finalized) => {
1180                                (finalized.sink.clone(), finalized.source.clone())
1181                            }
1182                        };
1183
1184                        graph_builders.create_external_output(
1185                            &input.metadata().location_id,
1186                            sink_expr,
1187                            &input_ident,
1188                            serialize_fn.as_ref(),
1189                            *next_stmt_id,
1190                        );
1191                    }
1192                    BuildersOrCallback::Callback(leaf_callback, _) => {
1193                        leaf_callback(self, next_stmt_id);
1194                    }
1195                }
1196
1197                *next_stmt_id += 1;
1198            }
1199
1200            HydroRoot::DestSink { sink, input, .. } => {
1201                let input_ident =
1202                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1203
1204                match builders_or_callback {
1205                    BuildersOrCallback::Builders(graph_builders) => {
1206                        graph_builders
1207                            .get_dfir_mut(&input.metadata().location_id)
1208                            .add_dfir(
1209                                parse_quote! {
1210                                    #input_ident -> dest_sink(#sink);
1211                                },
1212                                None,
1213                                Some(&next_stmt_id.to_string()),
1214                            );
1215                    }
1216                    BuildersOrCallback::Callback(leaf_callback, _) => {
1217                        leaf_callback(self, next_stmt_id);
1218                    }
1219                }
1220
1221                *next_stmt_id += 1;
1222            }
1223
1224            HydroRoot::CycleSink {
1225                cycle_id, input, ..
1226            } => {
1227                let input_ident =
1228                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1229
1230                match builders_or_callback {
1231                    BuildersOrCallback::Builders(graph_builders) => {
1232                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1233                            CollectionKind::KeyedSingleton {
1234                                key_type,
1235                                value_type,
1236                                ..
1237                            }
1238                            | CollectionKind::KeyedStream {
1239                                key_type,
1240                                value_type,
1241                                ..
1242                            } => {
1243                                parse_quote!((#key_type, #value_type))
1244                            }
1245                            CollectionKind::Stream { element_type, .. }
1246                            | CollectionKind::Singleton { element_type, .. }
1247                            | CollectionKind::Optional { element_type, .. } => {
1248                                parse_quote!(#element_type)
1249                            }
1250                        };
1251
1252                        let cycle_id_ident = cycle_id.as_ident();
1253                        graph_builders
1254                            .get_dfir_mut(&input.metadata().location_id)
1255                            .add_dfir(
1256                                parse_quote! {
1257                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1258                                },
1259                                None,
1260                                None,
1261                            );
1262                    }
1263                    // No ID, no callback
1264                    BuildersOrCallback::Callback(_, _) => {}
1265                }
1266            }
1267
1268            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1269                let input_ident =
1270                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1271
1272                match builders_or_callback {
1273                    BuildersOrCallback::Builders(graph_builders) => {
1274                        graph_builders
1275                            .get_dfir_mut(&input.metadata().location_id)
1276                            .add_dfir(
1277                                parse_quote! {
1278                                    #input_ident -> for_each(&mut #ident);
1279                                },
1280                                None,
1281                                Some(&next_stmt_id.to_string()),
1282                            );
1283                    }
1284                    BuildersOrCallback::Callback(leaf_callback, _) => {
1285                        leaf_callback(self, next_stmt_id);
1286                    }
1287                }
1288
1289                *next_stmt_id += 1;
1290            }
1291        }
1292    }
1293
1294    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1295        match self {
1296            HydroRoot::ForEach { op_metadata, .. }
1297            | HydroRoot::SendExternal { op_metadata, .. }
1298            | HydroRoot::DestSink { op_metadata, .. }
1299            | HydroRoot::CycleSink { op_metadata, .. }
1300            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1301        }
1302    }
1303
1304    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1305        match self {
1306            HydroRoot::ForEach { op_metadata, .. }
1307            | HydroRoot::SendExternal { op_metadata, .. }
1308            | HydroRoot::DestSink { op_metadata, .. }
1309            | HydroRoot::CycleSink { op_metadata, .. }
1310            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1311        }
1312    }
1313
1314    pub fn input(&self) -> &HydroNode {
1315        match self {
1316            HydroRoot::ForEach { input, .. }
1317            | HydroRoot::SendExternal { input, .. }
1318            | HydroRoot::DestSink { input, .. }
1319            | HydroRoot::CycleSink { input, .. }
1320            | HydroRoot::EmbeddedOutput { input, .. } => input,
1321        }
1322    }
1323
1324    pub fn input_metadata(&self) -> &HydroIrMetadata {
1325        self.input().metadata()
1326    }
1327
1328    pub fn print_root(&self) -> String {
1329        match self {
1330            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1331            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1332            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1333            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1334            HydroRoot::EmbeddedOutput { ident, .. } => {
1335                format!("EmbeddedOutput({})", ident)
1336            }
1337        }
1338    }
1339
1340    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1341        match self {
1342            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1343                transform(f);
1344            }
1345            HydroRoot::SendExternal { .. }
1346            | HydroRoot::CycleSink { .. }
1347            | HydroRoot::EmbeddedOutput { .. } => {}
1348        }
1349    }
1350}
1351
1352#[cfg(feature = "build")]
1353pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1354    let mut builders = SecondaryMap::new();
1355    let mut seen_tees = HashMap::new();
1356    let mut built_tees = HashMap::new();
1357    let mut next_stmt_id = 0;
1358    for leaf in ir {
1359        leaf.emit(
1360            &mut builders,
1361            &mut seen_tees,
1362            &mut built_tees,
1363            &mut next_stmt_id,
1364        );
1365    }
1366    builders
1367}
1368
1369#[cfg(feature = "build")]
1370pub fn traverse_dfir(
1371    ir: &mut [HydroRoot],
1372    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1373    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1374) {
1375    let mut seen_tees = HashMap::new();
1376    let mut built_tees = HashMap::new();
1377    let mut next_stmt_id = 0;
1378    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1379    ir.iter_mut().for_each(|leaf| {
1380        leaf.emit_core(
1381            &mut callback,
1382            &mut seen_tees,
1383            &mut built_tees,
1384            &mut next_stmt_id,
1385        );
1386    });
1387}
1388
1389pub fn transform_bottom_up(
1390    ir: &mut [HydroRoot],
1391    transform_root: &mut impl FnMut(&mut HydroRoot),
1392    transform_node: &mut impl FnMut(&mut HydroNode),
1393    check_well_formed: bool,
1394) {
1395    let mut seen_tees = HashMap::new();
1396    ir.iter_mut().for_each(|leaf| {
1397        leaf.transform_bottom_up(
1398            transform_root,
1399            transform_node,
1400            &mut seen_tees,
1401            check_well_formed,
1402        );
1403    });
1404}
1405
1406pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1407    let mut seen_tees = HashMap::new();
1408    ir.iter()
1409        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1410        .collect()
1411}
1412
1413type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1414thread_local! {
1415    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1416}
1417
1418pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1419    PRINTED_TEES.with(|printed_tees| {
1420        let mut printed_tees_mut = printed_tees.borrow_mut();
1421        *printed_tees_mut = Some((0, HashMap::new()));
1422        drop(printed_tees_mut);
1423
1424        let ret = f();
1425
1426        let mut printed_tees_mut = printed_tees.borrow_mut();
1427        *printed_tees_mut = None;
1428
1429        ret
1430    })
1431}
1432
1433pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1434
1435impl TeeNode {
1436    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1437        Rc::as_ptr(&self.0)
1438    }
1439}
1440
1441impl Debug for TeeNode {
1442    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1443        PRINTED_TEES.with(|printed_tees| {
1444            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1445            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1446
1447            if let Some(printed_tees_mut) = printed_tees_mut {
1448                if let Some(existing) = printed_tees_mut
1449                    .1
1450                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1451                {
1452                    write!(f, "<tee {}>", existing)
1453                } else {
1454                    let next_id = printed_tees_mut.0;
1455                    printed_tees_mut.0 += 1;
1456                    printed_tees_mut
1457                        .1
1458                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1459                    drop(printed_tees_mut_borrow);
1460                    write!(f, "<tee {}>: ", next_id)?;
1461                    Debug::fmt(&self.0.borrow(), f)
1462                }
1463            } else {
1464                drop(printed_tees_mut_borrow);
1465                write!(f, "<tee>: ")?;
1466                Debug::fmt(&self.0.borrow(), f)
1467            }
1468        })
1469    }
1470}
1471
1472impl Hash for TeeNode {
1473    fn hash<H: Hasher>(&self, state: &mut H) {
1474        self.0.borrow_mut().hash(state);
1475    }
1476}
1477
1478#[derive(Clone, PartialEq, Eq, Debug)]
1479pub enum BoundKind {
1480    Unbounded,
1481    Bounded,
1482}
1483
1484#[derive(Clone, PartialEq, Eq, Debug)]
1485pub enum StreamOrder {
1486    NoOrder,
1487    TotalOrder,
1488}
1489
1490#[derive(Clone, PartialEq, Eq, Debug)]
1491pub enum StreamRetry {
1492    AtLeastOnce,
1493    ExactlyOnce,
1494}
1495
1496#[derive(Clone, PartialEq, Eq, Debug)]
1497pub enum KeyedSingletonBoundKind {
1498    Unbounded,
1499    BoundedValue,
1500    Bounded,
1501}
1502
1503#[derive(Clone, PartialEq, Eq, Debug)]
1504pub enum CollectionKind {
1505    Stream {
1506        bound: BoundKind,
1507        order: StreamOrder,
1508        retry: StreamRetry,
1509        element_type: DebugType,
1510    },
1511    Singleton {
1512        bound: BoundKind,
1513        element_type: DebugType,
1514    },
1515    Optional {
1516        bound: BoundKind,
1517        element_type: DebugType,
1518    },
1519    KeyedStream {
1520        bound: BoundKind,
1521        value_order: StreamOrder,
1522        value_retry: StreamRetry,
1523        key_type: DebugType,
1524        value_type: DebugType,
1525    },
1526    KeyedSingleton {
1527        bound: KeyedSingletonBoundKind,
1528        key_type: DebugType,
1529        value_type: DebugType,
1530    },
1531}
1532
1533impl CollectionKind {
1534    pub fn is_bounded(&self) -> bool {
1535        matches!(
1536            self,
1537            CollectionKind::Stream {
1538                bound: BoundKind::Bounded,
1539                ..
1540            } | CollectionKind::Singleton {
1541                bound: BoundKind::Bounded,
1542                ..
1543            } | CollectionKind::Optional {
1544                bound: BoundKind::Bounded,
1545                ..
1546            } | CollectionKind::KeyedStream {
1547                bound: BoundKind::Bounded,
1548                ..
1549            } | CollectionKind::KeyedSingleton {
1550                bound: KeyedSingletonBoundKind::Bounded,
1551                ..
1552            }
1553        )
1554    }
1555}
1556
1557#[derive(Clone)]
1558pub struct HydroIrMetadata {
1559    pub location_id: LocationId,
1560    pub collection_kind: CollectionKind,
1561    pub cardinality: Option<usize>,
1562    pub tag: Option<String>,
1563    pub op: HydroIrOpMetadata,
1564}
1565
1566// HydroIrMetadata shouldn't be used to hash or compare
1567impl Hash for HydroIrMetadata {
1568    fn hash<H: Hasher>(&self, _: &mut H) {}
1569}
1570
1571impl PartialEq for HydroIrMetadata {
1572    fn eq(&self, _: &Self) -> bool {
1573        true
1574    }
1575}
1576
1577impl Eq for HydroIrMetadata {}
1578
1579impl Debug for HydroIrMetadata {
1580    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1581        f.debug_struct("HydroIrMetadata")
1582            .field("location_id", &self.location_id)
1583            .field("collection_kind", &self.collection_kind)
1584            .finish()
1585    }
1586}
1587
1588/// Metadata that is specific to the operator itself, rather than its outputs.
1589/// This is available on _both_ inner nodes and roots.
1590#[derive(Clone)]
1591pub struct HydroIrOpMetadata {
1592    pub backtrace: Backtrace,
1593    pub cpu_usage: Option<f64>,
1594    pub network_recv_cpu_usage: Option<f64>,
1595    pub id: Option<usize>,
1596}
1597
1598impl HydroIrOpMetadata {
1599    #[expect(
1600        clippy::new_without_default,
1601        reason = "explicit calls to new ensure correct backtrace bounds"
1602    )]
1603    pub fn new() -> HydroIrOpMetadata {
1604        Self::new_with_skip(1)
1605    }
1606
1607    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1608        HydroIrOpMetadata {
1609            backtrace: Backtrace::get_backtrace(2 + skip_count),
1610            cpu_usage: None,
1611            network_recv_cpu_usage: None,
1612            id: None,
1613        }
1614    }
1615}
1616
1617impl Debug for HydroIrOpMetadata {
1618    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1619        f.debug_struct("HydroIrOpMetadata").finish()
1620    }
1621}
1622
1623impl Hash for HydroIrOpMetadata {
1624    fn hash<H: Hasher>(&self, _: &mut H) {}
1625}
1626
1627/// An intermediate node in a Hydro graph, which consumes data
1628/// from upstream nodes and emits data to downstream nodes.
1629#[derive(Debug, Hash)]
1630pub enum HydroNode {
1631    Placeholder,
1632
1633    /// Manually "casts" between two different collection kinds.
1634    ///
1635    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1636    /// correctness checks. In particular, the user must ensure that every possible
1637    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1638    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1639    /// collection. This ensures that the simulator does not miss any possible outputs.
1640    Cast {
1641        inner: Box<HydroNode>,
1642        metadata: HydroIrMetadata,
1643    },
1644
1645    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1646    /// interpretation of the input stream.
1647    ///
1648    /// In production, this simply passes through the input, but in simulation, this operator
1649    /// explicitly selects a randomized interpretation.
1650    ObserveNonDet {
1651        inner: Box<HydroNode>,
1652        trusted: bool, // if true, we do not need to simulate non-determinism
1653        metadata: HydroIrMetadata,
1654    },
1655
1656    Source {
1657        source: HydroSource,
1658        metadata: HydroIrMetadata,
1659    },
1660
1661    SingletonSource {
1662        value: DebugExpr,
1663        metadata: HydroIrMetadata,
1664    },
1665
1666    CycleSource {
1667        cycle_id: CycleId,
1668        metadata: HydroIrMetadata,
1669    },
1670
1671    Tee {
1672        inner: TeeNode,
1673        metadata: HydroIrMetadata,
1674    },
1675
1676    BeginAtomic {
1677        inner: Box<HydroNode>,
1678        metadata: HydroIrMetadata,
1679    },
1680
1681    EndAtomic {
1682        inner: Box<HydroNode>,
1683        metadata: HydroIrMetadata,
1684    },
1685
1686    Batch {
1687        inner: Box<HydroNode>,
1688        metadata: HydroIrMetadata,
1689    },
1690
1691    YieldConcat {
1692        inner: Box<HydroNode>,
1693        metadata: HydroIrMetadata,
1694    },
1695
1696    Chain {
1697        first: Box<HydroNode>,
1698        second: Box<HydroNode>,
1699        metadata: HydroIrMetadata,
1700    },
1701
1702    ChainFirst {
1703        first: Box<HydroNode>,
1704        second: Box<HydroNode>,
1705        metadata: HydroIrMetadata,
1706    },
1707
1708    CrossProduct {
1709        left: Box<HydroNode>,
1710        right: Box<HydroNode>,
1711        metadata: HydroIrMetadata,
1712    },
1713
1714    CrossSingleton {
1715        left: Box<HydroNode>,
1716        right: Box<HydroNode>,
1717        metadata: HydroIrMetadata,
1718    },
1719
1720    Join {
1721        left: Box<HydroNode>,
1722        right: Box<HydroNode>,
1723        metadata: HydroIrMetadata,
1724    },
1725
1726    Difference {
1727        pos: Box<HydroNode>,
1728        neg: Box<HydroNode>,
1729        metadata: HydroIrMetadata,
1730    },
1731
1732    AntiJoin {
1733        pos: Box<HydroNode>,
1734        neg: Box<HydroNode>,
1735        metadata: HydroIrMetadata,
1736    },
1737
1738    ResolveFutures {
1739        input: Box<HydroNode>,
1740        metadata: HydroIrMetadata,
1741    },
1742    ResolveFuturesOrdered {
1743        input: Box<HydroNode>,
1744        metadata: HydroIrMetadata,
1745    },
1746
1747    Map {
1748        f: DebugExpr,
1749        input: Box<HydroNode>,
1750        metadata: HydroIrMetadata,
1751    },
1752    FlatMap {
1753        f: DebugExpr,
1754        input: Box<HydroNode>,
1755        metadata: HydroIrMetadata,
1756    },
1757    Filter {
1758        f: DebugExpr,
1759        input: Box<HydroNode>,
1760        metadata: HydroIrMetadata,
1761    },
1762    FilterMap {
1763        f: DebugExpr,
1764        input: Box<HydroNode>,
1765        metadata: HydroIrMetadata,
1766    },
1767
1768    DeferTick {
1769        input: Box<HydroNode>,
1770        metadata: HydroIrMetadata,
1771    },
1772    Enumerate {
1773        input: Box<HydroNode>,
1774        metadata: HydroIrMetadata,
1775    },
1776    Inspect {
1777        f: DebugExpr,
1778        input: Box<HydroNode>,
1779        metadata: HydroIrMetadata,
1780    },
1781
1782    Unique {
1783        input: Box<HydroNode>,
1784        metadata: HydroIrMetadata,
1785    },
1786
1787    Sort {
1788        input: Box<HydroNode>,
1789        metadata: HydroIrMetadata,
1790    },
1791    Fold {
1792        init: DebugExpr,
1793        acc: DebugExpr,
1794        input: Box<HydroNode>,
1795        metadata: HydroIrMetadata,
1796    },
1797
1798    Scan {
1799        init: DebugExpr,
1800        acc: DebugExpr,
1801        input: Box<HydroNode>,
1802        metadata: HydroIrMetadata,
1803    },
1804    FoldKeyed {
1805        init: DebugExpr,
1806        acc: DebugExpr,
1807        input: Box<HydroNode>,
1808        metadata: HydroIrMetadata,
1809    },
1810
1811    Reduce {
1812        f: DebugExpr,
1813        input: Box<HydroNode>,
1814        metadata: HydroIrMetadata,
1815    },
1816    ReduceKeyed {
1817        f: DebugExpr,
1818        input: Box<HydroNode>,
1819        metadata: HydroIrMetadata,
1820    },
1821    ReduceKeyedWatermark {
1822        f: DebugExpr,
1823        input: Box<HydroNode>,
1824        watermark: Box<HydroNode>,
1825        metadata: HydroIrMetadata,
1826    },
1827
1828    Network {
1829        name: Option<String>,
1830        serialize_fn: Option<DebugExpr>,
1831        instantiate_fn: DebugInstantiate,
1832        deserialize_fn: Option<DebugExpr>,
1833        input: Box<HydroNode>,
1834        metadata: HydroIrMetadata,
1835    },
1836
1837    ExternalInput {
1838        from_external_key: LocationKey,
1839        from_port_id: ExternalPortId,
1840        from_many: bool,
1841        codec_type: DebugType,
1842        port_hint: NetworkHint,
1843        instantiate_fn: DebugInstantiate,
1844        deserialize_fn: Option<DebugExpr>,
1845        metadata: HydroIrMetadata,
1846    },
1847
1848    Counter {
1849        tag: String,
1850        duration: DebugExpr,
1851        prefix: String,
1852        input: Box<HydroNode>,
1853        metadata: HydroIrMetadata,
1854    },
1855}
1856
1857pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1858pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1859
1860impl HydroNode {
1861    pub fn transform_bottom_up(
1862        &mut self,
1863        transform: &mut impl FnMut(&mut HydroNode),
1864        seen_tees: &mut SeenTees,
1865        check_well_formed: bool,
1866    ) {
1867        self.transform_children(
1868            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1869            seen_tees,
1870        );
1871
1872        transform(self);
1873
1874        let self_location = self.metadata().location_id.root();
1875
1876        if check_well_formed {
1877            match &*self {
1878                HydroNode::Network { .. } => {}
1879                _ => {
1880                    self.input_metadata().iter().for_each(|i| {
1881                        if i.location_id.root() != self_location {
1882                            panic!(
1883                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1884                                i,
1885                                i.location_id.root(),
1886                                self,
1887                                self_location
1888                            )
1889                        }
1890                    });
1891                }
1892            }
1893        }
1894    }
1895
1896    #[inline(always)]
1897    pub fn transform_children(
1898        &mut self,
1899        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1900        seen_tees: &mut SeenTees,
1901    ) {
1902        match self {
1903            HydroNode::Placeholder => {
1904                panic!();
1905            }
1906
1907            HydroNode::Source { .. }
1908            | HydroNode::SingletonSource { .. }
1909            | HydroNode::CycleSource { .. }
1910            | HydroNode::ExternalInput { .. } => {}
1911
1912            HydroNode::Tee { inner, .. } => {
1913                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1914                    *inner = TeeNode(transformed.clone());
1915                } else {
1916                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1917                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1918                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1919                    transform(&mut orig, seen_tees);
1920                    *transformed_cell.borrow_mut() = orig;
1921                    *inner = TeeNode(transformed_cell);
1922                }
1923            }
1924
1925            HydroNode::Cast { inner, .. }
1926            | HydroNode::ObserveNonDet { inner, .. }
1927            | HydroNode::BeginAtomic { inner, .. }
1928            | HydroNode::EndAtomic { inner, .. }
1929            | HydroNode::Batch { inner, .. }
1930            | HydroNode::YieldConcat { inner, .. } => {
1931                transform(inner.as_mut(), seen_tees);
1932            }
1933
1934            HydroNode::Chain { first, second, .. } => {
1935                transform(first.as_mut(), seen_tees);
1936                transform(second.as_mut(), seen_tees);
1937            }
1938
1939            HydroNode::ChainFirst { first, second, .. } => {
1940                transform(first.as_mut(), seen_tees);
1941                transform(second.as_mut(), seen_tees);
1942            }
1943
1944            HydroNode::CrossSingleton { left, right, .. }
1945            | HydroNode::CrossProduct { left, right, .. }
1946            | HydroNode::Join { left, right, .. } => {
1947                transform(left.as_mut(), seen_tees);
1948                transform(right.as_mut(), seen_tees);
1949            }
1950
1951            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1952                transform(pos.as_mut(), seen_tees);
1953                transform(neg.as_mut(), seen_tees);
1954            }
1955
1956            HydroNode::ReduceKeyedWatermark {
1957                input, watermark, ..
1958            } => {
1959                transform(input.as_mut(), seen_tees);
1960                transform(watermark.as_mut(), seen_tees);
1961            }
1962
1963            HydroNode::Map { input, .. }
1964            | HydroNode::ResolveFutures { input, .. }
1965            | HydroNode::ResolveFuturesOrdered { input, .. }
1966            | HydroNode::FlatMap { input, .. }
1967            | HydroNode::Filter { input, .. }
1968            | HydroNode::FilterMap { input, .. }
1969            | HydroNode::Sort { input, .. }
1970            | HydroNode::DeferTick { input, .. }
1971            | HydroNode::Enumerate { input, .. }
1972            | HydroNode::Inspect { input, .. }
1973            | HydroNode::Unique { input, .. }
1974            | HydroNode::Network { input, .. }
1975            | HydroNode::Fold { input, .. }
1976            | HydroNode::Scan { input, .. }
1977            | HydroNode::FoldKeyed { input, .. }
1978            | HydroNode::Reduce { input, .. }
1979            | HydroNode::ReduceKeyed { input, .. }
1980            | HydroNode::Counter { input, .. } => {
1981                transform(input.as_mut(), seen_tees);
1982            }
1983        }
1984    }
1985
1986    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1987        match self {
1988            HydroNode::Placeholder => HydroNode::Placeholder,
1989            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1990                inner: Box::new(inner.deep_clone(seen_tees)),
1991                metadata: metadata.clone(),
1992            },
1993            HydroNode::ObserveNonDet {
1994                inner,
1995                trusted,
1996                metadata,
1997            } => HydroNode::ObserveNonDet {
1998                inner: Box::new(inner.deep_clone(seen_tees)),
1999                trusted: *trusted,
2000                metadata: metadata.clone(),
2001            },
2002            HydroNode::Source { source, metadata } => HydroNode::Source {
2003                source: source.clone(),
2004                metadata: metadata.clone(),
2005            },
2006            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
2007                value: value.clone(),
2008                metadata: metadata.clone(),
2009            },
2010            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2011                cycle_id: *cycle_id,
2012                metadata: metadata.clone(),
2013            },
2014            HydroNode::Tee { inner, metadata } => {
2015                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2016                    HydroNode::Tee {
2017                        inner: TeeNode(transformed.clone()),
2018                        metadata: metadata.clone(),
2019                    }
2020                } else {
2021                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2022                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2023                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2024                    *new_rc.borrow_mut() = cloned;
2025                    HydroNode::Tee {
2026                        inner: TeeNode(new_rc),
2027                        metadata: metadata.clone(),
2028                    }
2029                }
2030            }
2031            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2032                inner: Box::new(inner.deep_clone(seen_tees)),
2033                metadata: metadata.clone(),
2034            },
2035            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2036                inner: Box::new(inner.deep_clone(seen_tees)),
2037                metadata: metadata.clone(),
2038            },
2039            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2040                inner: Box::new(inner.deep_clone(seen_tees)),
2041                metadata: metadata.clone(),
2042            },
2043            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2044                inner: Box::new(inner.deep_clone(seen_tees)),
2045                metadata: metadata.clone(),
2046            },
2047            HydroNode::Chain {
2048                first,
2049                second,
2050                metadata,
2051            } => HydroNode::Chain {
2052                first: Box::new(first.deep_clone(seen_tees)),
2053                second: Box::new(second.deep_clone(seen_tees)),
2054                metadata: metadata.clone(),
2055            },
2056            HydroNode::ChainFirst {
2057                first,
2058                second,
2059                metadata,
2060            } => HydroNode::ChainFirst {
2061                first: Box::new(first.deep_clone(seen_tees)),
2062                second: Box::new(second.deep_clone(seen_tees)),
2063                metadata: metadata.clone(),
2064            },
2065            HydroNode::CrossProduct {
2066                left,
2067                right,
2068                metadata,
2069            } => HydroNode::CrossProduct {
2070                left: Box::new(left.deep_clone(seen_tees)),
2071                right: Box::new(right.deep_clone(seen_tees)),
2072                metadata: metadata.clone(),
2073            },
2074            HydroNode::CrossSingleton {
2075                left,
2076                right,
2077                metadata,
2078            } => HydroNode::CrossSingleton {
2079                left: Box::new(left.deep_clone(seen_tees)),
2080                right: Box::new(right.deep_clone(seen_tees)),
2081                metadata: metadata.clone(),
2082            },
2083            HydroNode::Join {
2084                left,
2085                right,
2086                metadata,
2087            } => HydroNode::Join {
2088                left: Box::new(left.deep_clone(seen_tees)),
2089                right: Box::new(right.deep_clone(seen_tees)),
2090                metadata: metadata.clone(),
2091            },
2092            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2093                pos: Box::new(pos.deep_clone(seen_tees)),
2094                neg: Box::new(neg.deep_clone(seen_tees)),
2095                metadata: metadata.clone(),
2096            },
2097            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2098                pos: Box::new(pos.deep_clone(seen_tees)),
2099                neg: Box::new(neg.deep_clone(seen_tees)),
2100                metadata: metadata.clone(),
2101            },
2102            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2103                input: Box::new(input.deep_clone(seen_tees)),
2104                metadata: metadata.clone(),
2105            },
2106            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2107                HydroNode::ResolveFuturesOrdered {
2108                    input: Box::new(input.deep_clone(seen_tees)),
2109                    metadata: metadata.clone(),
2110                }
2111            }
2112            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2113                f: f.clone(),
2114                input: Box::new(input.deep_clone(seen_tees)),
2115                metadata: metadata.clone(),
2116            },
2117            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2118                f: f.clone(),
2119                input: Box::new(input.deep_clone(seen_tees)),
2120                metadata: metadata.clone(),
2121            },
2122            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2123                f: f.clone(),
2124                input: Box::new(input.deep_clone(seen_tees)),
2125                metadata: metadata.clone(),
2126            },
2127            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2128                f: f.clone(),
2129                input: Box::new(input.deep_clone(seen_tees)),
2130                metadata: metadata.clone(),
2131            },
2132            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2133                input: Box::new(input.deep_clone(seen_tees)),
2134                metadata: metadata.clone(),
2135            },
2136            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2137                input: Box::new(input.deep_clone(seen_tees)),
2138                metadata: metadata.clone(),
2139            },
2140            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2141                f: f.clone(),
2142                input: Box::new(input.deep_clone(seen_tees)),
2143                metadata: metadata.clone(),
2144            },
2145            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2146                input: Box::new(input.deep_clone(seen_tees)),
2147                metadata: metadata.clone(),
2148            },
2149            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2150                input: Box::new(input.deep_clone(seen_tees)),
2151                metadata: metadata.clone(),
2152            },
2153            HydroNode::Fold {
2154                init,
2155                acc,
2156                input,
2157                metadata,
2158            } => HydroNode::Fold {
2159                init: init.clone(),
2160                acc: acc.clone(),
2161                input: Box::new(input.deep_clone(seen_tees)),
2162                metadata: metadata.clone(),
2163            },
2164            HydroNode::Scan {
2165                init,
2166                acc,
2167                input,
2168                metadata,
2169            } => HydroNode::Scan {
2170                init: init.clone(),
2171                acc: acc.clone(),
2172                input: Box::new(input.deep_clone(seen_tees)),
2173                metadata: metadata.clone(),
2174            },
2175            HydroNode::FoldKeyed {
2176                init,
2177                acc,
2178                input,
2179                metadata,
2180            } => HydroNode::FoldKeyed {
2181                init: init.clone(),
2182                acc: acc.clone(),
2183                input: Box::new(input.deep_clone(seen_tees)),
2184                metadata: metadata.clone(),
2185            },
2186            HydroNode::ReduceKeyedWatermark {
2187                f,
2188                input,
2189                watermark,
2190                metadata,
2191            } => HydroNode::ReduceKeyedWatermark {
2192                f: f.clone(),
2193                input: Box::new(input.deep_clone(seen_tees)),
2194                watermark: Box::new(watermark.deep_clone(seen_tees)),
2195                metadata: metadata.clone(),
2196            },
2197            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2198                f: f.clone(),
2199                input: Box::new(input.deep_clone(seen_tees)),
2200                metadata: metadata.clone(),
2201            },
2202            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2203                f: f.clone(),
2204                input: Box::new(input.deep_clone(seen_tees)),
2205                metadata: metadata.clone(),
2206            },
2207            HydroNode::Network {
2208                name,
2209                serialize_fn,
2210                instantiate_fn,
2211                deserialize_fn,
2212                input,
2213                metadata,
2214            } => HydroNode::Network {
2215                name: name.clone(),
2216                serialize_fn: serialize_fn.clone(),
2217                instantiate_fn: instantiate_fn.clone(),
2218                deserialize_fn: deserialize_fn.clone(),
2219                input: Box::new(input.deep_clone(seen_tees)),
2220                metadata: metadata.clone(),
2221            },
2222            HydroNode::ExternalInput {
2223                from_external_key,
2224                from_port_id,
2225                from_many,
2226                codec_type,
2227                port_hint,
2228                instantiate_fn,
2229                deserialize_fn,
2230                metadata,
2231            } => HydroNode::ExternalInput {
2232                from_external_key: *from_external_key,
2233                from_port_id: *from_port_id,
2234                from_many: *from_many,
2235                codec_type: codec_type.clone(),
2236                port_hint: *port_hint,
2237                instantiate_fn: instantiate_fn.clone(),
2238                deserialize_fn: deserialize_fn.clone(),
2239                metadata: metadata.clone(),
2240            },
2241            HydroNode::Counter {
2242                tag,
2243                duration,
2244                prefix,
2245                input,
2246                metadata,
2247            } => HydroNode::Counter {
2248                tag: tag.clone(),
2249                duration: duration.clone(),
2250                prefix: prefix.clone(),
2251                input: Box::new(input.deep_clone(seen_tees)),
2252                metadata: metadata.clone(),
2253            },
2254        }
2255    }
2256
2257    #[cfg(feature = "build")]
2258    pub fn emit_core(
2259        &mut self,
2260        builders_or_callback: &mut BuildersOrCallback<
2261            impl FnMut(&mut HydroRoot, &mut usize),
2262            impl FnMut(&mut HydroNode, &mut usize),
2263        >,
2264        seen_tees: &mut SeenTees,
2265        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2266        next_stmt_id: &mut usize,
2267    ) -> syn::Ident {
2268        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2269
2270        self.transform_bottom_up(
2271            &mut |node: &mut HydroNode| {
2272                let out_location = node.metadata().location_id.clone();
2273                match node {
2274                    HydroNode::Placeholder => {
2275                        panic!()
2276                    }
2277
2278                    HydroNode::Cast { .. } => {
2279                        // Cast passes through the input ident unchanged
2280                        // The input ident is already on the stack from processing the child
2281                        match builders_or_callback {
2282                            BuildersOrCallback::Builders(_) => {}
2283                            BuildersOrCallback::Callback(_, node_callback) => {
2284                                node_callback(node, next_stmt_id);
2285                            }
2286                        }
2287
2288                        *next_stmt_id += 1;
2289                        // input_ident stays on stack as output
2290                    }
2291
2292                    HydroNode::ObserveNonDet {
2293                        inner,
2294                        trusted,
2295                        metadata,
2296                        ..
2297                    } => {
2298                        let inner_ident = ident_stack.pop().unwrap();
2299
2300                        let observe_ident =
2301                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2302
2303                        match builders_or_callback {
2304                            BuildersOrCallback::Builders(graph_builders) => {
2305                                graph_builders.observe_nondet(
2306                                    *trusted,
2307                                    &inner.metadata().location_id,
2308                                    inner_ident,
2309                                    &inner.metadata().collection_kind,
2310                                    &observe_ident,
2311                                    &metadata.collection_kind,
2312                                    &metadata.op,
2313                                );
2314                            }
2315                            BuildersOrCallback::Callback(_, node_callback) => {
2316                                node_callback(node, next_stmt_id);
2317                            }
2318                        }
2319
2320                        *next_stmt_id += 1;
2321
2322                        ident_stack.push(observe_ident);
2323                    }
2324
2325                    HydroNode::Batch {
2326                        inner, metadata, ..
2327                    } => {
2328                        let inner_ident = ident_stack.pop().unwrap();
2329
2330                        let batch_ident =
2331                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2332
2333                        match builders_or_callback {
2334                            BuildersOrCallback::Builders(graph_builders) => {
2335                                graph_builders.batch(
2336                                    inner_ident,
2337                                    &inner.metadata().location_id,
2338                                    &inner.metadata().collection_kind,
2339                                    &batch_ident,
2340                                    &out_location,
2341                                    &metadata.op,
2342                                );
2343                            }
2344                            BuildersOrCallback::Callback(_, node_callback) => {
2345                                node_callback(node, next_stmt_id);
2346                            }
2347                        }
2348
2349                        *next_stmt_id += 1;
2350
2351                        ident_stack.push(batch_ident);
2352                    }
2353
2354                    HydroNode::YieldConcat { inner, .. } => {
2355                        let inner_ident = ident_stack.pop().unwrap();
2356
2357                        let yield_ident =
2358                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2359
2360                        match builders_or_callback {
2361                            BuildersOrCallback::Builders(graph_builders) => {
2362                                graph_builders.yield_from_tick(
2363                                    inner_ident,
2364                                    &inner.metadata().location_id,
2365                                    &inner.metadata().collection_kind,
2366                                    &yield_ident,
2367                                    &out_location,
2368                                );
2369                            }
2370                            BuildersOrCallback::Callback(_, node_callback) => {
2371                                node_callback(node, next_stmt_id);
2372                            }
2373                        }
2374
2375                        *next_stmt_id += 1;
2376
2377                        ident_stack.push(yield_ident);
2378                    }
2379
2380                    HydroNode::BeginAtomic { inner, metadata } => {
2381                        let inner_ident = ident_stack.pop().unwrap();
2382
2383                        let begin_ident =
2384                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2385
2386                        match builders_or_callback {
2387                            BuildersOrCallback::Builders(graph_builders) => {
2388                                graph_builders.begin_atomic(
2389                                    inner_ident,
2390                                    &inner.metadata().location_id,
2391                                    &inner.metadata().collection_kind,
2392                                    &begin_ident,
2393                                    &out_location,
2394                                    &metadata.op,
2395                                );
2396                            }
2397                            BuildersOrCallback::Callback(_, node_callback) => {
2398                                node_callback(node, next_stmt_id);
2399                            }
2400                        }
2401
2402                        *next_stmt_id += 1;
2403
2404                        ident_stack.push(begin_ident);
2405                    }
2406
2407                    HydroNode::EndAtomic { inner, .. } => {
2408                        let inner_ident = ident_stack.pop().unwrap();
2409
2410                        let end_ident =
2411                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2412
2413                        match builders_or_callback {
2414                            BuildersOrCallback::Builders(graph_builders) => {
2415                                graph_builders.end_atomic(
2416                                    inner_ident,
2417                                    &inner.metadata().location_id,
2418                                    &inner.metadata().collection_kind,
2419                                    &end_ident,
2420                                );
2421                            }
2422                            BuildersOrCallback::Callback(_, node_callback) => {
2423                                node_callback(node, next_stmt_id);
2424                            }
2425                        }
2426
2427                        *next_stmt_id += 1;
2428
2429                        ident_stack.push(end_ident);
2430                    }
2431
2432                    HydroNode::Source {
2433                        source, metadata, ..
2434                    } => {
2435                        if let HydroSource::ExternalNetwork() = source {
2436                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2437                        } else {
2438                            let source_ident =
2439                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2440
2441                            let source_stmt = match source {
2442                                HydroSource::Stream(expr) => {
2443                                    debug_assert!(metadata.location_id.is_top_level());
2444                                    parse_quote! {
2445                                        #source_ident = source_stream(#expr);
2446                                    }
2447                                }
2448
2449                                HydroSource::ExternalNetwork() => {
2450                                    unreachable!()
2451                                }
2452
2453                                HydroSource::Iter(expr) => {
2454                                    if metadata.location_id.is_top_level() {
2455                                        parse_quote! {
2456                                            #source_ident = source_iter(#expr);
2457                                        }
2458                                    } else {
2459                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2460                                        parse_quote! {
2461                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2462                                        }
2463                                    }
2464                                }
2465
2466                                HydroSource::Spin() => {
2467                                    debug_assert!(metadata.location_id.is_top_level());
2468                                    parse_quote! {
2469                                        #source_ident = spin();
2470                                    }
2471                                }
2472
2473                                HydroSource::ClusterMembers(target_loc, state) => {
2474                                    debug_assert!(metadata.location_id.is_top_level());
2475
2476                                    let members_tee_ident = syn::Ident::new(
2477                                        &format!(
2478                                            "__cluster_members_tee_{}_{}",
2479                                            metadata.location_id.root().key(),
2480                                            target_loc.key(),
2481                                        ),
2482                                        Span::call_site(),
2483                                    );
2484
2485                                    match state {
2486                                        ClusterMembersState::Stream(d) => {
2487                                            parse_quote! {
2488                                                #members_tee_ident = source_stream(#d) -> tee();
2489                                                #source_ident = #members_tee_ident;
2490                                            }
2491                                        },
2492                                        ClusterMembersState::Uninit => syn::parse_quote! {
2493                                            #source_ident = source_stream(DUMMY);
2494                                        },
2495                                        ClusterMembersState::Tee(..) => parse_quote! {
2496                                            #source_ident = #members_tee_ident;
2497                                        },
2498                                    }
2499                                }
2500
2501                                HydroSource::Embedded(ident) => {
2502                                    parse_quote! {
2503                                        #source_ident = source_stream(#ident);
2504                                    }
2505                                }
2506                            };
2507
2508                            match builders_or_callback {
2509                                BuildersOrCallback::Builders(graph_builders) => {
2510                                    let builder = graph_builders.get_dfir_mut(&out_location);
2511                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2512                                }
2513                                BuildersOrCallback::Callback(_, node_callback) => {
2514                                    node_callback(node, next_stmt_id);
2515                                }
2516                            }
2517
2518                            *next_stmt_id += 1;
2519
2520                            ident_stack.push(source_ident);
2521                        }
2522                    }
2523
2524                    HydroNode::SingletonSource { value, metadata } => {
2525                        let source_ident =
2526                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2527
2528                        match builders_or_callback {
2529                            BuildersOrCallback::Builders(graph_builders) => {
2530                                let builder = graph_builders.get_dfir_mut(&out_location);
2531
2532                                if metadata.location_id.is_top_level()
2533                                    && metadata.collection_kind.is_bounded()
2534                                {
2535                                    builder.add_dfir(
2536                                        parse_quote! {
2537                                            #source_ident = source_iter([#value]);
2538                                        },
2539                                        None,
2540                                        Some(&next_stmt_id.to_string()),
2541                                    );
2542                                } else {
2543                                    builder.add_dfir(
2544                                        parse_quote! {
2545                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2546                                        },
2547                                        None,
2548                                        Some(&next_stmt_id.to_string()),
2549                                    );
2550                                }
2551                            }
2552                            BuildersOrCallback::Callback(_, node_callback) => {
2553                                node_callback(node, next_stmt_id);
2554                            }
2555                        }
2556
2557                        *next_stmt_id += 1;
2558
2559                        ident_stack.push(source_ident);
2560                    }
2561
2562                    HydroNode::CycleSource { cycle_id, .. } => {
2563                        let ident = cycle_id.as_ident();
2564
2565                        match builders_or_callback {
2566                            BuildersOrCallback::Builders(_) => {}
2567                            BuildersOrCallback::Callback(_, node_callback) => {
2568                                node_callback(node, next_stmt_id);
2569                            }
2570                        }
2571
2572                        // consume a stmt id even though we did not emit anything so that we can instrument this
2573                        *next_stmt_id += 1;
2574
2575                        ident_stack.push(ident);
2576                    }
2577
2578                    HydroNode::Tee { inner, .. } => {
2579                        let ret_ident = if let Some(teed_from) =
2580                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2581                        {
2582                            match builders_or_callback {
2583                                BuildersOrCallback::Builders(_) => {}
2584                                BuildersOrCallback::Callback(_, node_callback) => {
2585                                    node_callback(node, next_stmt_id);
2586                                }
2587                            }
2588
2589                            teed_from.clone()
2590                        } else {
2591                            // The inner node was already processed by transform_bottom_up,
2592                            // so its ident is on the stack
2593                            let inner_ident = ident_stack.pop().unwrap();
2594
2595                            let tee_ident =
2596                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2597
2598                            built_tees.insert(
2599                                inner.0.as_ref() as *const RefCell<HydroNode>,
2600                                tee_ident.clone(),
2601                            );
2602
2603                            match builders_or_callback {
2604                                BuildersOrCallback::Builders(graph_builders) => {
2605                                    let builder = graph_builders.get_dfir_mut(&out_location);
2606                                    builder.add_dfir(
2607                                        parse_quote! {
2608                                            #tee_ident = #inner_ident -> tee();
2609                                        },
2610                                        None,
2611                                        Some(&next_stmt_id.to_string()),
2612                                    );
2613                                }
2614                                BuildersOrCallback::Callback(_, node_callback) => {
2615                                    node_callback(node, next_stmt_id);
2616                                }
2617                            }
2618
2619                            tee_ident
2620                        };
2621
2622                        // we consume a stmt id regardless of if we emit the tee() operator,
2623                        // so that during rewrites we touch all recipients of the tee()
2624
2625                        *next_stmt_id += 1;
2626                        ident_stack.push(ret_ident);
2627                    }
2628
2629                    HydroNode::Chain { .. } => {
2630                        // Children are processed left-to-right, so second is on top
2631                        let second_ident = ident_stack.pop().unwrap();
2632                        let first_ident = ident_stack.pop().unwrap();
2633
2634                        let chain_ident =
2635                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2636
2637                        match builders_or_callback {
2638                            BuildersOrCallback::Builders(graph_builders) => {
2639                                let builder = graph_builders.get_dfir_mut(&out_location);
2640                                builder.add_dfir(
2641                                    parse_quote! {
2642                                        #chain_ident = chain();
2643                                        #first_ident -> [0]#chain_ident;
2644                                        #second_ident -> [1]#chain_ident;
2645                                    },
2646                                    None,
2647                                    Some(&next_stmt_id.to_string()),
2648                                );
2649                            }
2650                            BuildersOrCallback::Callback(_, node_callback) => {
2651                                node_callback(node, next_stmt_id);
2652                            }
2653                        }
2654
2655                        *next_stmt_id += 1;
2656
2657                        ident_stack.push(chain_ident);
2658                    }
2659
2660                    HydroNode::ChainFirst { .. } => {
2661                        let second_ident = ident_stack.pop().unwrap();
2662                        let first_ident = ident_stack.pop().unwrap();
2663
2664                        let chain_ident =
2665                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2666
2667                        match builders_or_callback {
2668                            BuildersOrCallback::Builders(graph_builders) => {
2669                                let builder = graph_builders.get_dfir_mut(&out_location);
2670                                builder.add_dfir(
2671                                    parse_quote! {
2672                                        #chain_ident = chain_first_n(1);
2673                                        #first_ident -> [0]#chain_ident;
2674                                        #second_ident -> [1]#chain_ident;
2675                                    },
2676                                    None,
2677                                    Some(&next_stmt_id.to_string()),
2678                                );
2679                            }
2680                            BuildersOrCallback::Callback(_, node_callback) => {
2681                                node_callback(node, next_stmt_id);
2682                            }
2683                        }
2684
2685                        *next_stmt_id += 1;
2686
2687                        ident_stack.push(chain_ident);
2688                    }
2689
2690                    HydroNode::CrossSingleton { right, .. } => {
2691                        let right_ident = ident_stack.pop().unwrap();
2692                        let left_ident = ident_stack.pop().unwrap();
2693
2694                        let cross_ident =
2695                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2696
2697                        match builders_or_callback {
2698                            BuildersOrCallback::Builders(graph_builders) => {
2699                                let builder = graph_builders.get_dfir_mut(&out_location);
2700
2701                                if right.metadata().location_id.is_top_level()
2702                                    && right.metadata().collection_kind.is_bounded()
2703                                {
2704                                    builder.add_dfir(
2705                                        parse_quote! {
2706                                            #cross_ident = cross_singleton();
2707                                            #left_ident -> [input]#cross_ident;
2708                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2709                                        },
2710                                        None,
2711                                        Some(&next_stmt_id.to_string()),
2712                                    );
2713                                } else {
2714                                    builder.add_dfir(
2715                                        parse_quote! {
2716                                            #cross_ident = cross_singleton();
2717                                            #left_ident -> [input]#cross_ident;
2718                                            #right_ident -> [single]#cross_ident;
2719                                        },
2720                                        None,
2721                                        Some(&next_stmt_id.to_string()),
2722                                    );
2723                                }
2724                            }
2725                            BuildersOrCallback::Callback(_, node_callback) => {
2726                                node_callback(node, next_stmt_id);
2727                            }
2728                        }
2729
2730                        *next_stmt_id += 1;
2731
2732                        ident_stack.push(cross_ident);
2733                    }
2734
2735                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2736                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2737                            parse_quote!(cross_join_multiset)
2738                        } else {
2739                            parse_quote!(join_multiset)
2740                        };
2741
2742                        let (HydroNode::CrossProduct { left, right, .. }
2743                        | HydroNode::Join { left, right, .. }) = node
2744                        else {
2745                            unreachable!()
2746                        };
2747
2748                        let is_top_level = left.metadata().location_id.is_top_level()
2749                            && right.metadata().location_id.is_top_level();
2750                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2751                            quote!('static)
2752                        } else {
2753                            quote!('tick)
2754                        };
2755
2756                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2757                            quote!('static)
2758                        } else {
2759                            quote!('tick)
2760                        };
2761
2762                        let right_ident = ident_stack.pop().unwrap();
2763                        let left_ident = ident_stack.pop().unwrap();
2764
2765                        let stream_ident =
2766                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2767
2768                        match builders_or_callback {
2769                            BuildersOrCallback::Builders(graph_builders) => {
2770                                let builder = graph_builders.get_dfir_mut(&out_location);
2771                                builder.add_dfir(
2772                                    if is_top_level {
2773                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2774                                        // a multiset_delta() to negate the replay behavior
2775                                        parse_quote! {
2776                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2777                                            #left_ident -> [0]#stream_ident;
2778                                            #right_ident -> [1]#stream_ident;
2779                                        }
2780                                    } else {
2781                                        parse_quote! {
2782                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2783                                            #left_ident -> [0]#stream_ident;
2784                                            #right_ident -> [1]#stream_ident;
2785                                        }
2786                                    }
2787                                    ,
2788                                    None,
2789                                    Some(&next_stmt_id.to_string()),
2790                                );
2791                            }
2792                            BuildersOrCallback::Callback(_, node_callback) => {
2793                                node_callback(node, next_stmt_id);
2794                            }
2795                        }
2796
2797                        *next_stmt_id += 1;
2798
2799                        ident_stack.push(stream_ident);
2800                    }
2801
2802                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2803                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2804                            parse_quote!(difference)
2805                        } else {
2806                            parse_quote!(anti_join)
2807                        };
2808
2809                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2810                            node
2811                        else {
2812                            unreachable!()
2813                        };
2814
2815                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2816                            quote!('static)
2817                        } else {
2818                            quote!('tick)
2819                        };
2820
2821                        let neg_ident = ident_stack.pop().unwrap();
2822                        let pos_ident = ident_stack.pop().unwrap();
2823
2824                        let stream_ident =
2825                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2826
2827                        match builders_or_callback {
2828                            BuildersOrCallback::Builders(graph_builders) => {
2829                                let builder = graph_builders.get_dfir_mut(&out_location);
2830                                builder.add_dfir(
2831                                    parse_quote! {
2832                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2833                                        #pos_ident -> [pos]#stream_ident;
2834                                        #neg_ident -> [neg]#stream_ident;
2835                                    },
2836                                    None,
2837                                    Some(&next_stmt_id.to_string()),
2838                                );
2839                            }
2840                            BuildersOrCallback::Callback(_, node_callback) => {
2841                                node_callback(node, next_stmt_id);
2842                            }
2843                        }
2844
2845                        *next_stmt_id += 1;
2846
2847                        ident_stack.push(stream_ident);
2848                    }
2849
2850                    HydroNode::ResolveFutures { .. } => {
2851                        let input_ident = ident_stack.pop().unwrap();
2852
2853                        let futures_ident =
2854                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2855
2856                        match builders_or_callback {
2857                            BuildersOrCallback::Builders(graph_builders) => {
2858                                let builder = graph_builders.get_dfir_mut(&out_location);
2859                                builder.add_dfir(
2860                                    parse_quote! {
2861                                        #futures_ident = #input_ident -> resolve_futures();
2862                                    },
2863                                    None,
2864                                    Some(&next_stmt_id.to_string()),
2865                                );
2866                            }
2867                            BuildersOrCallback::Callback(_, node_callback) => {
2868                                node_callback(node, next_stmt_id);
2869                            }
2870                        }
2871
2872                        *next_stmt_id += 1;
2873
2874                        ident_stack.push(futures_ident);
2875                    }
2876
2877                    HydroNode::ResolveFuturesOrdered { .. } => {
2878                        let input_ident = ident_stack.pop().unwrap();
2879
2880                        let futures_ident =
2881                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2882
2883                        match builders_or_callback {
2884                            BuildersOrCallback::Builders(graph_builders) => {
2885                                let builder = graph_builders.get_dfir_mut(&out_location);
2886                                builder.add_dfir(
2887                                    parse_quote! {
2888                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2889                                    },
2890                                    None,
2891                                    Some(&next_stmt_id.to_string()),
2892                                );
2893                            }
2894                            BuildersOrCallback::Callback(_, node_callback) => {
2895                                node_callback(node, next_stmt_id);
2896                            }
2897                        }
2898
2899                        *next_stmt_id += 1;
2900
2901                        ident_stack.push(futures_ident);
2902                    }
2903
2904                    HydroNode::Map { f, .. } => {
2905                        let input_ident = ident_stack.pop().unwrap();
2906
2907                        let map_ident =
2908                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2909
2910                        match builders_or_callback {
2911                            BuildersOrCallback::Builders(graph_builders) => {
2912                                let builder = graph_builders.get_dfir_mut(&out_location);
2913                                builder.add_dfir(
2914                                    parse_quote! {
2915                                        #map_ident = #input_ident -> map(#f);
2916                                    },
2917                                    None,
2918                                    Some(&next_stmt_id.to_string()),
2919                                );
2920                            }
2921                            BuildersOrCallback::Callback(_, node_callback) => {
2922                                node_callback(node, next_stmt_id);
2923                            }
2924                        }
2925
2926                        *next_stmt_id += 1;
2927
2928                        ident_stack.push(map_ident);
2929                    }
2930
2931                    HydroNode::FlatMap { f, .. } => {
2932                        let input_ident = ident_stack.pop().unwrap();
2933
2934                        let flat_map_ident =
2935                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2936
2937                        match builders_or_callback {
2938                            BuildersOrCallback::Builders(graph_builders) => {
2939                                let builder = graph_builders.get_dfir_mut(&out_location);
2940                                builder.add_dfir(
2941                                    parse_quote! {
2942                                        #flat_map_ident = #input_ident -> flat_map(#f);
2943                                    },
2944                                    None,
2945                                    Some(&next_stmt_id.to_string()),
2946                                );
2947                            }
2948                            BuildersOrCallback::Callback(_, node_callback) => {
2949                                node_callback(node, next_stmt_id);
2950                            }
2951                        }
2952
2953                        *next_stmt_id += 1;
2954
2955                        ident_stack.push(flat_map_ident);
2956                    }
2957
2958                    HydroNode::Filter { f, .. } => {
2959                        let input_ident = ident_stack.pop().unwrap();
2960
2961                        let filter_ident =
2962                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2963
2964                        match builders_or_callback {
2965                            BuildersOrCallback::Builders(graph_builders) => {
2966                                let builder = graph_builders.get_dfir_mut(&out_location);
2967                                builder.add_dfir(
2968                                    parse_quote! {
2969                                        #filter_ident = #input_ident -> filter(#f);
2970                                    },
2971                                    None,
2972                                    Some(&next_stmt_id.to_string()),
2973                                );
2974                            }
2975                            BuildersOrCallback::Callback(_, node_callback) => {
2976                                node_callback(node, next_stmt_id);
2977                            }
2978                        }
2979
2980                        *next_stmt_id += 1;
2981
2982                        ident_stack.push(filter_ident);
2983                    }
2984
2985                    HydroNode::FilterMap { f, .. } => {
2986                        let input_ident = ident_stack.pop().unwrap();
2987
2988                        let filter_map_ident =
2989                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2990
2991                        match builders_or_callback {
2992                            BuildersOrCallback::Builders(graph_builders) => {
2993                                let builder = graph_builders.get_dfir_mut(&out_location);
2994                                builder.add_dfir(
2995                                    parse_quote! {
2996                                        #filter_map_ident = #input_ident -> filter_map(#f);
2997                                    },
2998                                    None,
2999                                    Some(&next_stmt_id.to_string()),
3000                                );
3001                            }
3002                            BuildersOrCallback::Callback(_, node_callback) => {
3003                                node_callback(node, next_stmt_id);
3004                            }
3005                        }
3006
3007                        *next_stmt_id += 1;
3008
3009                        ident_stack.push(filter_map_ident);
3010                    }
3011
3012                    HydroNode::Sort { .. } => {
3013                        let input_ident = ident_stack.pop().unwrap();
3014
3015                        let sort_ident =
3016                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3017
3018                        match builders_or_callback {
3019                            BuildersOrCallback::Builders(graph_builders) => {
3020                                let builder = graph_builders.get_dfir_mut(&out_location);
3021                                builder.add_dfir(
3022                                    parse_quote! {
3023                                        #sort_ident = #input_ident -> sort();
3024                                    },
3025                                    None,
3026                                    Some(&next_stmt_id.to_string()),
3027                                );
3028                            }
3029                            BuildersOrCallback::Callback(_, node_callback) => {
3030                                node_callback(node, next_stmt_id);
3031                            }
3032                        }
3033
3034                        *next_stmt_id += 1;
3035
3036                        ident_stack.push(sort_ident);
3037                    }
3038
3039                    HydroNode::DeferTick { .. } => {
3040                        let input_ident = ident_stack.pop().unwrap();
3041
3042                        let defer_tick_ident =
3043                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3044
3045                        match builders_or_callback {
3046                            BuildersOrCallback::Builders(graph_builders) => {
3047                                let builder = graph_builders.get_dfir_mut(&out_location);
3048                                builder.add_dfir(
3049                                    parse_quote! {
3050                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3051                                    },
3052                                    None,
3053                                    Some(&next_stmt_id.to_string()),
3054                                );
3055                            }
3056                            BuildersOrCallback::Callback(_, node_callback) => {
3057                                node_callback(node, next_stmt_id);
3058                            }
3059                        }
3060
3061                        *next_stmt_id += 1;
3062
3063                        ident_stack.push(defer_tick_ident);
3064                    }
3065
3066                    HydroNode::Enumerate { input, .. } => {
3067                        let input_ident = ident_stack.pop().unwrap();
3068
3069                        let enumerate_ident =
3070                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3071
3072                        match builders_or_callback {
3073                            BuildersOrCallback::Builders(graph_builders) => {
3074                                let builder = graph_builders.get_dfir_mut(&out_location);
3075                                let lifetime = if input.metadata().location_id.is_top_level() {
3076                                    quote!('static)
3077                                } else {
3078                                    quote!('tick)
3079                                };
3080                                builder.add_dfir(
3081                                    parse_quote! {
3082                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3083                                    },
3084                                    None,
3085                                    Some(&next_stmt_id.to_string()),
3086                                );
3087                            }
3088                            BuildersOrCallback::Callback(_, node_callback) => {
3089                                node_callback(node, next_stmt_id);
3090                            }
3091                        }
3092
3093                        *next_stmt_id += 1;
3094
3095                        ident_stack.push(enumerate_ident);
3096                    }
3097
3098                    HydroNode::Inspect { f, .. } => {
3099                        let input_ident = ident_stack.pop().unwrap();
3100
3101                        let inspect_ident =
3102                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3103
3104                        match builders_or_callback {
3105                            BuildersOrCallback::Builders(graph_builders) => {
3106                                let builder = graph_builders.get_dfir_mut(&out_location);
3107                                builder.add_dfir(
3108                                    parse_quote! {
3109                                        #inspect_ident = #input_ident -> inspect(#f);
3110                                    },
3111                                    None,
3112                                    Some(&next_stmt_id.to_string()),
3113                                );
3114                            }
3115                            BuildersOrCallback::Callback(_, node_callback) => {
3116                                node_callback(node, next_stmt_id);
3117                            }
3118                        }
3119
3120                        *next_stmt_id += 1;
3121
3122                        ident_stack.push(inspect_ident);
3123                    }
3124
3125                    HydroNode::Unique { input, .. } => {
3126                        let input_ident = ident_stack.pop().unwrap();
3127
3128                        let unique_ident =
3129                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3130
3131                        match builders_or_callback {
3132                            BuildersOrCallback::Builders(graph_builders) => {
3133                                let builder = graph_builders.get_dfir_mut(&out_location);
3134                                let lifetime = if input.metadata().location_id.is_top_level() {
3135                                    quote!('static)
3136                                } else {
3137                                    quote!('tick)
3138                                };
3139
3140                                builder.add_dfir(
3141                                    parse_quote! {
3142                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3143                                    },
3144                                    None,
3145                                    Some(&next_stmt_id.to_string()),
3146                                );
3147                            }
3148                            BuildersOrCallback::Callback(_, node_callback) => {
3149                                node_callback(node, next_stmt_id);
3150                            }
3151                        }
3152
3153                        *next_stmt_id += 1;
3154
3155                        ident_stack.push(unique_ident);
3156                    }
3157
3158                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3159                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3160                            if input.metadata().location_id.is_top_level()
3161                                && input.metadata().collection_kind.is_bounded()
3162                            {
3163                                parse_quote!(fold_no_replay)
3164                            } else {
3165                                parse_quote!(fold)
3166                            }
3167                        } else if matches!(node, HydroNode::Scan { .. }) {
3168                            parse_quote!(scan)
3169                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3170                            if input.metadata().location_id.is_top_level()
3171                                && input.metadata().collection_kind.is_bounded()
3172                            {
3173                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3174                            } else {
3175                                parse_quote!(fold_keyed)
3176                            }
3177                        } else {
3178                            unreachable!()
3179                        };
3180
3181                        let (HydroNode::Fold { input, .. }
3182                        | HydroNode::FoldKeyed { input, .. }
3183                        | HydroNode::Scan { input, .. }) = node
3184                        else {
3185                            unreachable!()
3186                        };
3187
3188                        let lifetime = if input.metadata().location_id.is_top_level() {
3189                            quote!('static)
3190                        } else {
3191                            quote!('tick)
3192                        };
3193
3194                        let input_ident = ident_stack.pop().unwrap();
3195
3196                        let (HydroNode::Fold { init, acc, .. }
3197                        | HydroNode::FoldKeyed { init, acc, .. }
3198                        | HydroNode::Scan { init, acc, .. }) = &*node
3199                        else {
3200                            unreachable!()
3201                        };
3202
3203                        let fold_ident =
3204                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3205
3206                        match builders_or_callback {
3207                            BuildersOrCallback::Builders(graph_builders) => {
3208                                if matches!(node, HydroNode::Fold { .. })
3209                                    && node.metadata().location_id.is_top_level()
3210                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3211                                    && graph_builders.singleton_intermediates()
3212                                    && !node.metadata().collection_kind.is_bounded()
3213                                {
3214                                    let builder = graph_builders.get_dfir_mut(&out_location);
3215
3216                                    let acc: syn::Expr = parse_quote!({
3217                                        let mut __inner = #acc;
3218                                        move |__state, __value| {
3219                                            __inner(__state, __value);
3220                                            Some(__state.clone())
3221                                        }
3222                                    });
3223
3224                                    builder.add_dfir(
3225                                        parse_quote! {
3226                                            source_iter([(#init)()]) -> [0]#fold_ident;
3227                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3228                                            #fold_ident = chain();
3229                                        },
3230                                        None,
3231                                        Some(&next_stmt_id.to_string()),
3232                                    );
3233                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3234                                    && node.metadata().location_id.is_top_level()
3235                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3236                                    && graph_builders.singleton_intermediates()
3237                                    && !node.metadata().collection_kind.is_bounded()
3238                                {
3239                                    let builder = graph_builders.get_dfir_mut(&out_location);
3240
3241                                    let acc: syn::Expr = parse_quote!({
3242                                        let mut __init = #init;
3243                                        let mut __inner = #acc;
3244                                        move |__state, __kv: (_, _)| {
3245                                            // TODO(shadaj): we can avoid the clone when the entry exists
3246                                            let __state = __state
3247                                                .entry(::std::clone::Clone::clone(&__kv.0))
3248                                                .or_insert_with(|| (__init)());
3249                                            __inner(__state, __kv.1);
3250                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3251                                        }
3252                                    });
3253
3254                                    builder.add_dfir(
3255                                        parse_quote! {
3256                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3257                                        },
3258                                        None,
3259                                        Some(&next_stmt_id.to_string()),
3260                                    );
3261                                } else {
3262                                    let builder = graph_builders.get_dfir_mut(&out_location);
3263                                    builder.add_dfir(
3264                                        parse_quote! {
3265                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3266                                        },
3267                                        None,
3268                                        Some(&next_stmt_id.to_string()),
3269                                    );
3270                                }
3271                            }
3272                            BuildersOrCallback::Callback(_, node_callback) => {
3273                                node_callback(node, next_stmt_id);
3274                            }
3275                        }
3276
3277                        *next_stmt_id += 1;
3278
3279                        ident_stack.push(fold_ident);
3280                    }
3281
3282                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3283                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3284                            if input.metadata().location_id.is_top_level()
3285                                && input.metadata().collection_kind.is_bounded()
3286                            {
3287                                parse_quote!(reduce_no_replay)
3288                            } else {
3289                                parse_quote!(reduce)
3290                            }
3291                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3292                            if input.metadata().location_id.is_top_level()
3293                                && input.metadata().collection_kind.is_bounded()
3294                            {
3295                                todo!(
3296                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3297                                )
3298                            } else {
3299                                parse_quote!(reduce_keyed)
3300                            }
3301                        } else {
3302                            unreachable!()
3303                        };
3304
3305                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3306                        else {
3307                            unreachable!()
3308                        };
3309
3310                        let lifetime = if input.metadata().location_id.is_top_level() {
3311                            quote!('static)
3312                        } else {
3313                            quote!('tick)
3314                        };
3315
3316                        let input_ident = ident_stack.pop().unwrap();
3317
3318                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3319                        else {
3320                            unreachable!()
3321                        };
3322
3323                        let reduce_ident =
3324                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3325
3326                        match builders_or_callback {
3327                            BuildersOrCallback::Builders(graph_builders) => {
3328                                if matches!(node, HydroNode::Reduce { .. })
3329                                    && node.metadata().location_id.is_top_level()
3330                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3331                                    && graph_builders.singleton_intermediates()
3332                                    && !node.metadata().collection_kind.is_bounded()
3333                                {
3334                                    todo!(
3335                                        "Reduce with optional intermediates is not yet supported in simulator"
3336                                    );
3337                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3338                                    && node.metadata().location_id.is_top_level()
3339                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3340                                    && graph_builders.singleton_intermediates()
3341                                    && !node.metadata().collection_kind.is_bounded()
3342                                {
3343                                    todo!(
3344                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3345                                    );
3346                                } else {
3347                                    let builder = graph_builders.get_dfir_mut(&out_location);
3348                                    builder.add_dfir(
3349                                        parse_quote! {
3350                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3351                                        },
3352                                        None,
3353                                        Some(&next_stmt_id.to_string()),
3354                                    );
3355                                }
3356                            }
3357                            BuildersOrCallback::Callback(_, node_callback) => {
3358                                node_callback(node, next_stmt_id);
3359                            }
3360                        }
3361
3362                        *next_stmt_id += 1;
3363
3364                        ident_stack.push(reduce_ident);
3365                    }
3366
3367                    HydroNode::ReduceKeyedWatermark {
3368                        f,
3369                        input,
3370                        metadata,
3371                        ..
3372                    } => {
3373                        let lifetime = if input.metadata().location_id.is_top_level() {
3374                            quote!('static)
3375                        } else {
3376                            quote!('tick)
3377                        };
3378
3379                        // watermark is processed second, so it's on top
3380                        let watermark_ident = ident_stack.pop().unwrap();
3381                        let input_ident = ident_stack.pop().unwrap();
3382
3383                        let chain_ident = syn::Ident::new(
3384                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3385                            Span::call_site(),
3386                        );
3387
3388                        let fold_ident =
3389                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3390
3391                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3392                            && input.metadata().collection_kind.is_bounded()
3393                        {
3394                            parse_quote!(fold_no_replay)
3395                        } else {
3396                            parse_quote!(fold)
3397                        };
3398
3399                        match builders_or_callback {
3400                            BuildersOrCallback::Builders(graph_builders) => {
3401                                if metadata.location_id.is_top_level()
3402                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3403                                    && graph_builders.singleton_intermediates()
3404                                    && !metadata.collection_kind.is_bounded()
3405                                {
3406                                    todo!(
3407                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3408                                    )
3409                                } else {
3410                                    let builder = graph_builders.get_dfir_mut(&out_location);
3411                                    builder.add_dfir(
3412                                        parse_quote! {
3413                                            #chain_ident = chain();
3414                                            #input_ident
3415                                                -> map(|x| (Some(x), None))
3416                                                -> [0]#chain_ident;
3417                                            #watermark_ident
3418                                                -> map(|watermark| (None, Some(watermark)))
3419                                                -> [1]#chain_ident;
3420
3421                                            #fold_ident = #chain_ident
3422                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3423                                                    let __reduce_keyed_fn = #f;
3424                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3425                                                        if let Some((k, v)) = opt_payload {
3426                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3427                                                                if k <= curr_watermark {
3428                                                                    return;
3429                                                                }
3430                                                            }
3431                                                            match map.entry(k) {
3432                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3433                                                                    e.insert(v);
3434                                                                }
3435                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3436                                                                    __reduce_keyed_fn(e.get_mut(), v);
3437                                                                }
3438                                                            }
3439                                                        } else {
3440                                                            let watermark = opt_watermark.unwrap();
3441                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3442                                                                if watermark <= curr_watermark {
3443                                                                    return;
3444                                                                }
3445                                                            }
3446                                                            *opt_curr_watermark = opt_watermark;
3447                                                            map.retain(|k, _| *k > watermark);
3448                                                        }
3449                                                    }
3450                                                })
3451                                                -> flat_map(|(map, _curr_watermark)| map);
3452                                        },
3453                                        None,
3454                                        Some(&next_stmt_id.to_string()),
3455                                    );
3456                                }
3457                            }
3458                            BuildersOrCallback::Callback(_, node_callback) => {
3459                                node_callback(node, next_stmt_id);
3460                            }
3461                        }
3462
3463                        *next_stmt_id += 1;
3464
3465                        ident_stack.push(fold_ident);
3466                    }
3467
3468                    HydroNode::Network {
3469                        serialize_fn: serialize_pipeline,
3470                        instantiate_fn,
3471                        deserialize_fn: deserialize_pipeline,
3472                        input,
3473                        ..
3474                    } => {
3475                        let input_ident = ident_stack.pop().unwrap();
3476
3477                        let receiver_stream_ident =
3478                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3479
3480                        match builders_or_callback {
3481                            BuildersOrCallback::Builders(graph_builders) => {
3482                                let (sink_expr, source_expr) = match instantiate_fn {
3483                                    DebugInstantiate::Building => (
3484                                        syn::parse_quote!(DUMMY_SINK),
3485                                        syn::parse_quote!(DUMMY_SOURCE),
3486                                    ),
3487
3488                                    DebugInstantiate::Finalized(finalized) => {
3489                                        (finalized.sink.clone(), finalized.source.clone())
3490                                    }
3491                                };
3492
3493                                graph_builders.create_network(
3494                                    &input.metadata().location_id,
3495                                    &out_location,
3496                                    input_ident,
3497                                    &receiver_stream_ident,
3498                                    serialize_pipeline.as_ref(),
3499                                    sink_expr,
3500                                    source_expr,
3501                                    deserialize_pipeline.as_ref(),
3502                                    *next_stmt_id,
3503                                );
3504                            }
3505                            BuildersOrCallback::Callback(_, node_callback) => {
3506                                node_callback(node, next_stmt_id);
3507                            }
3508                        }
3509
3510                        *next_stmt_id += 1;
3511
3512                        ident_stack.push(receiver_stream_ident);
3513                    }
3514
3515                    HydroNode::ExternalInput {
3516                        instantiate_fn,
3517                        deserialize_fn: deserialize_pipeline,
3518                        ..
3519                    } => {
3520                        let receiver_stream_ident =
3521                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3522
3523                        match builders_or_callback {
3524                            BuildersOrCallback::Builders(graph_builders) => {
3525                                let (_, source_expr) = match instantiate_fn {
3526                                    DebugInstantiate::Building => (
3527                                        syn::parse_quote!(DUMMY_SINK),
3528                                        syn::parse_quote!(DUMMY_SOURCE),
3529                                    ),
3530
3531                                    DebugInstantiate::Finalized(finalized) => {
3532                                        (finalized.sink.clone(), finalized.source.clone())
3533                                    }
3534                                };
3535
3536                                graph_builders.create_external_source(
3537                                    &out_location,
3538                                    source_expr,
3539                                    &receiver_stream_ident,
3540                                    deserialize_pipeline.as_ref(),
3541                                    *next_stmt_id,
3542                                );
3543                            }
3544                            BuildersOrCallback::Callback(_, node_callback) => {
3545                                node_callback(node, next_stmt_id);
3546                            }
3547                        }
3548
3549                        *next_stmt_id += 1;
3550
3551                        ident_stack.push(receiver_stream_ident);
3552                    }
3553
3554                    HydroNode::Counter {
3555                        tag,
3556                        duration,
3557                        prefix,
3558                        ..
3559                    } => {
3560                        let input_ident = ident_stack.pop().unwrap();
3561
3562                        let counter_ident =
3563                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3564
3565                        match builders_or_callback {
3566                            BuildersOrCallback::Builders(graph_builders) => {
3567                                let arg = format!("{}({})", prefix, tag);
3568                                let builder = graph_builders.get_dfir_mut(&out_location);
3569                                builder.add_dfir(
3570                                    parse_quote! {
3571                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3572                                    },
3573                                    None,
3574                                    Some(&next_stmt_id.to_string()),
3575                                );
3576                            }
3577                            BuildersOrCallback::Callback(_, node_callback) => {
3578                                node_callback(node, next_stmt_id);
3579                            }
3580                        }
3581
3582                        *next_stmt_id += 1;
3583
3584                        ident_stack.push(counter_ident);
3585                    }
3586                }
3587            },
3588            seen_tees,
3589            false,
3590        );
3591
3592        ident_stack
3593            .pop()
3594            .expect("ident_stack should have exactly one element after traversal")
3595    }
3596
3597    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3598        match self {
3599            HydroNode::Placeholder => {
3600                panic!()
3601            }
3602            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3603            HydroNode::Source { source, .. } => match source {
3604                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3605                HydroSource::ExternalNetwork()
3606                | HydroSource::Spin()
3607                | HydroSource::ClusterMembers(_, _)
3608                | HydroSource::Embedded(_) => {} // TODO: what goes here?
3609            },
3610            HydroNode::SingletonSource { value, .. } => {
3611                transform(value);
3612            }
3613            HydroNode::CycleSource { .. }
3614            | HydroNode::Tee { .. }
3615            | HydroNode::YieldConcat { .. }
3616            | HydroNode::BeginAtomic { .. }
3617            | HydroNode::EndAtomic { .. }
3618            | HydroNode::Batch { .. }
3619            | HydroNode::Chain { .. }
3620            | HydroNode::ChainFirst { .. }
3621            | HydroNode::CrossProduct { .. }
3622            | HydroNode::CrossSingleton { .. }
3623            | HydroNode::ResolveFutures { .. }
3624            | HydroNode::ResolveFuturesOrdered { .. }
3625            | HydroNode::Join { .. }
3626            | HydroNode::Difference { .. }
3627            | HydroNode::AntiJoin { .. }
3628            | HydroNode::DeferTick { .. }
3629            | HydroNode::Enumerate { .. }
3630            | HydroNode::Unique { .. }
3631            | HydroNode::Sort { .. } => {}
3632            HydroNode::Map { f, .. }
3633            | HydroNode::FlatMap { f, .. }
3634            | HydroNode::Filter { f, .. }
3635            | HydroNode::FilterMap { f, .. }
3636            | HydroNode::Inspect { f, .. }
3637            | HydroNode::Reduce { f, .. }
3638            | HydroNode::ReduceKeyed { f, .. }
3639            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3640                transform(f);
3641            }
3642            HydroNode::Fold { init, acc, .. }
3643            | HydroNode::Scan { init, acc, .. }
3644            | HydroNode::FoldKeyed { init, acc, .. } => {
3645                transform(init);
3646                transform(acc);
3647            }
3648            HydroNode::Network {
3649                serialize_fn,
3650                deserialize_fn,
3651                ..
3652            } => {
3653                if let Some(serialize_fn) = serialize_fn {
3654                    transform(serialize_fn);
3655                }
3656                if let Some(deserialize_fn) = deserialize_fn {
3657                    transform(deserialize_fn);
3658                }
3659            }
3660            HydroNode::ExternalInput { deserialize_fn, .. } => {
3661                if let Some(deserialize_fn) = deserialize_fn {
3662                    transform(deserialize_fn);
3663                }
3664            }
3665            HydroNode::Counter { duration, .. } => {
3666                transform(duration);
3667            }
3668        }
3669    }
3670
3671    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3672        &self.metadata().op
3673    }
3674
3675    pub fn metadata(&self) -> &HydroIrMetadata {
3676        match self {
3677            HydroNode::Placeholder => {
3678                panic!()
3679            }
3680            HydroNode::Cast { metadata, .. } => metadata,
3681            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3682            HydroNode::Source { metadata, .. } => metadata,
3683            HydroNode::SingletonSource { metadata, .. } => metadata,
3684            HydroNode::CycleSource { metadata, .. } => metadata,
3685            HydroNode::Tee { metadata, .. } => metadata,
3686            HydroNode::YieldConcat { metadata, .. } => metadata,
3687            HydroNode::BeginAtomic { metadata, .. } => metadata,
3688            HydroNode::EndAtomic { metadata, .. } => metadata,
3689            HydroNode::Batch { metadata, .. } => metadata,
3690            HydroNode::Chain { metadata, .. } => metadata,
3691            HydroNode::ChainFirst { metadata, .. } => metadata,
3692            HydroNode::CrossProduct { metadata, .. } => metadata,
3693            HydroNode::CrossSingleton { metadata, .. } => metadata,
3694            HydroNode::Join { metadata, .. } => metadata,
3695            HydroNode::Difference { metadata, .. } => metadata,
3696            HydroNode::AntiJoin { metadata, .. } => metadata,
3697            HydroNode::ResolveFutures { metadata, .. } => metadata,
3698            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3699            HydroNode::Map { metadata, .. } => metadata,
3700            HydroNode::FlatMap { metadata, .. } => metadata,
3701            HydroNode::Filter { metadata, .. } => metadata,
3702            HydroNode::FilterMap { metadata, .. } => metadata,
3703            HydroNode::DeferTick { metadata, .. } => metadata,
3704            HydroNode::Enumerate { metadata, .. } => metadata,
3705            HydroNode::Inspect { metadata, .. } => metadata,
3706            HydroNode::Unique { metadata, .. } => metadata,
3707            HydroNode::Sort { metadata, .. } => metadata,
3708            HydroNode::Scan { metadata, .. } => metadata,
3709            HydroNode::Fold { metadata, .. } => metadata,
3710            HydroNode::FoldKeyed { metadata, .. } => metadata,
3711            HydroNode::Reduce { metadata, .. } => metadata,
3712            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3713            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3714            HydroNode::ExternalInput { metadata, .. } => metadata,
3715            HydroNode::Network { metadata, .. } => metadata,
3716            HydroNode::Counter { metadata, .. } => metadata,
3717        }
3718    }
3719
3720    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3721        &mut self.metadata_mut().op
3722    }
3723
3724    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3725        match self {
3726            HydroNode::Placeholder => {
3727                panic!()
3728            }
3729            HydroNode::Cast { metadata, .. } => metadata,
3730            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3731            HydroNode::Source { metadata, .. } => metadata,
3732            HydroNode::SingletonSource { metadata, .. } => metadata,
3733            HydroNode::CycleSource { metadata, .. } => metadata,
3734            HydroNode::Tee { metadata, .. } => metadata,
3735            HydroNode::YieldConcat { metadata, .. } => metadata,
3736            HydroNode::BeginAtomic { metadata, .. } => metadata,
3737            HydroNode::EndAtomic { metadata, .. } => metadata,
3738            HydroNode::Batch { metadata, .. } => metadata,
3739            HydroNode::Chain { metadata, .. } => metadata,
3740            HydroNode::ChainFirst { metadata, .. } => metadata,
3741            HydroNode::CrossProduct { metadata, .. } => metadata,
3742            HydroNode::CrossSingleton { metadata, .. } => metadata,
3743            HydroNode::Join { metadata, .. } => metadata,
3744            HydroNode::Difference { metadata, .. } => metadata,
3745            HydroNode::AntiJoin { metadata, .. } => metadata,
3746            HydroNode::ResolveFutures { metadata, .. } => metadata,
3747            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3748            HydroNode::Map { metadata, .. } => metadata,
3749            HydroNode::FlatMap { metadata, .. } => metadata,
3750            HydroNode::Filter { metadata, .. } => metadata,
3751            HydroNode::FilterMap { metadata, .. } => metadata,
3752            HydroNode::DeferTick { metadata, .. } => metadata,
3753            HydroNode::Enumerate { metadata, .. } => metadata,
3754            HydroNode::Inspect { metadata, .. } => metadata,
3755            HydroNode::Unique { metadata, .. } => metadata,
3756            HydroNode::Sort { metadata, .. } => metadata,
3757            HydroNode::Scan { metadata, .. } => metadata,
3758            HydroNode::Fold { metadata, .. } => metadata,
3759            HydroNode::FoldKeyed { metadata, .. } => metadata,
3760            HydroNode::Reduce { metadata, .. } => metadata,
3761            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3762            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3763            HydroNode::ExternalInput { metadata, .. } => metadata,
3764            HydroNode::Network { metadata, .. } => metadata,
3765            HydroNode::Counter { metadata, .. } => metadata,
3766        }
3767    }
3768
3769    pub fn input(&self) -> Vec<&HydroNode> {
3770        match self {
3771            HydroNode::Placeholder => {
3772                panic!()
3773            }
3774            HydroNode::Source { .. }
3775            | HydroNode::SingletonSource { .. }
3776            | HydroNode::ExternalInput { .. }
3777            | HydroNode::CycleSource { .. }
3778            | HydroNode::Tee { .. } => {
3779                // Tee should find its input in separate special ways
3780                vec![]
3781            }
3782            HydroNode::Cast { inner, .. }
3783            | HydroNode::ObserveNonDet { inner, .. }
3784            | HydroNode::YieldConcat { inner, .. }
3785            | HydroNode::BeginAtomic { inner, .. }
3786            | HydroNode::EndAtomic { inner, .. }
3787            | HydroNode::Batch { inner, .. } => {
3788                vec![inner]
3789            }
3790            HydroNode::Chain { first, second, .. } => {
3791                vec![first, second]
3792            }
3793            HydroNode::ChainFirst { first, second, .. } => {
3794                vec![first, second]
3795            }
3796            HydroNode::CrossProduct { left, right, .. }
3797            | HydroNode::CrossSingleton { left, right, .. }
3798            | HydroNode::Join { left, right, .. } => {
3799                vec![left, right]
3800            }
3801            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3802                vec![pos, neg]
3803            }
3804            HydroNode::Map { input, .. }
3805            | HydroNode::FlatMap { input, .. }
3806            | HydroNode::Filter { input, .. }
3807            | HydroNode::FilterMap { input, .. }
3808            | HydroNode::Sort { input, .. }
3809            | HydroNode::DeferTick { input, .. }
3810            | HydroNode::Enumerate { input, .. }
3811            | HydroNode::Inspect { input, .. }
3812            | HydroNode::Unique { input, .. }
3813            | HydroNode::Network { input, .. }
3814            | HydroNode::Counter { input, .. }
3815            | HydroNode::ResolveFutures { input, .. }
3816            | HydroNode::ResolveFuturesOrdered { input, .. }
3817            | HydroNode::Fold { input, .. }
3818            | HydroNode::FoldKeyed { input, .. }
3819            | HydroNode::Reduce { input, .. }
3820            | HydroNode::ReduceKeyed { input, .. }
3821            | HydroNode::Scan { input, .. } => {
3822                vec![input]
3823            }
3824            HydroNode::ReduceKeyedWatermark {
3825                input, watermark, ..
3826            } => {
3827                vec![input, watermark]
3828            }
3829        }
3830    }
3831
3832    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3833        self.input()
3834            .iter()
3835            .map(|input_node| input_node.metadata())
3836            .collect()
3837    }
3838
3839    pub fn print_root(&self) -> String {
3840        match self {
3841            HydroNode::Placeholder => {
3842                panic!()
3843            }
3844            HydroNode::Cast { .. } => "Cast()".to_owned(),
3845            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3846            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3847            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3848            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3849            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3850            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3851            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3852            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3853            HydroNode::Batch { .. } => "Batch()".to_owned(),
3854            HydroNode::Chain { first, second, .. } => {
3855                format!("Chain({}, {})", first.print_root(), second.print_root())
3856            }
3857            HydroNode::ChainFirst { first, second, .. } => {
3858                format!(
3859                    "ChainFirst({}, {})",
3860                    first.print_root(),
3861                    second.print_root()
3862                )
3863            }
3864            HydroNode::CrossProduct { left, right, .. } => {
3865                format!(
3866                    "CrossProduct({}, {})",
3867                    left.print_root(),
3868                    right.print_root()
3869                )
3870            }
3871            HydroNode::CrossSingleton { left, right, .. } => {
3872                format!(
3873                    "CrossSingleton({}, {})",
3874                    left.print_root(),
3875                    right.print_root()
3876                )
3877            }
3878            HydroNode::Join { left, right, .. } => {
3879                format!("Join({}, {})", left.print_root(), right.print_root())
3880            }
3881            HydroNode::Difference { pos, neg, .. } => {
3882                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3883            }
3884            HydroNode::AntiJoin { pos, neg, .. } => {
3885                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3886            }
3887            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3888            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3889            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3890            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3891            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3892            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3893            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3894            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3895            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3896            HydroNode::Unique { .. } => "Unique()".to_owned(),
3897            HydroNode::Sort { .. } => "Sort()".to_owned(),
3898            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3899            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3900            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3901            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3902            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3903            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3904            HydroNode::Network { .. } => "Network()".to_owned(),
3905            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3906            HydroNode::Counter { tag, duration, .. } => {
3907                format!("Counter({:?}, {:?})", tag, duration)
3908            }
3909        }
3910    }
3911}
3912
3913#[cfg(feature = "build")]
3914fn instantiate_network<'a, D>(
3915    env: &mut D::InstantiateEnv,
3916    from_location: &LocationId,
3917    to_location: &LocationId,
3918    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3919    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3920    name: Option<&str>,
3921) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3922where
3923    D: Deploy<'a>,
3924{
3925    let ((sink, source), connect_fn) = match (from_location, to_location) {
3926        (&LocationId::Process(from), &LocationId::Process(to)) => {
3927            let from_node = processes
3928                .get(from)
3929                .unwrap_or_else(|| {
3930                    panic!("A process used in the graph was not instantiated: {}", from)
3931                })
3932                .clone();
3933            let to_node = processes
3934                .get(to)
3935                .unwrap_or_else(|| {
3936                    panic!("A process used in the graph was not instantiated: {}", to)
3937                })
3938                .clone();
3939
3940            let sink_port = from_node.next_port();
3941            let source_port = to_node.next_port();
3942
3943            (
3944                D::o2o_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3945                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3946            )
3947        }
3948        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3949            let from_node = processes
3950                .get(from)
3951                .unwrap_or_else(|| {
3952                    panic!("A process used in the graph was not instantiated: {}", from)
3953                })
3954                .clone();
3955            let to_node = clusters
3956                .get(to)
3957                .unwrap_or_else(|| {
3958                    panic!("A cluster used in the graph was not instantiated: {}", to)
3959                })
3960                .clone();
3961
3962            let sink_port = from_node.next_port();
3963            let source_port = to_node.next_port();
3964
3965            (
3966                D::o2m_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3967                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3968            )
3969        }
3970        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3971            let from_node = clusters
3972                .get(from)
3973                .unwrap_or_else(|| {
3974                    panic!("A cluster used in the graph was not instantiated: {}", from)
3975                })
3976                .clone();
3977            let to_node = processes
3978                .get(to)
3979                .unwrap_or_else(|| {
3980                    panic!("A process used in the graph was not instantiated: {}", to)
3981                })
3982                .clone();
3983
3984            let sink_port = from_node.next_port();
3985            let source_port = to_node.next_port();
3986
3987            (
3988                D::m2o_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3989                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3990            )
3991        }
3992        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3993            let from_node = clusters
3994                .get(from)
3995                .unwrap_or_else(|| {
3996                    panic!("A cluster used in the graph was not instantiated: {}", from)
3997                })
3998                .clone();
3999            let to_node = clusters
4000                .get(to)
4001                .unwrap_or_else(|| {
4002                    panic!("A cluster used in the graph was not instantiated: {}", to)
4003                })
4004                .clone();
4005
4006            let sink_port = from_node.next_port();
4007            let source_port = to_node.next_port();
4008
4009            (
4010                D::m2m_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
4011                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4012            )
4013        }
4014        (LocationId::Tick(_, _), _) => panic!(),
4015        (_, LocationId::Tick(_, _)) => panic!(),
4016        (LocationId::Atomic(_), _) => panic!(),
4017        (_, LocationId::Atomic(_)) => panic!(),
4018    };
4019    (sink, source, connect_fn)
4020}
4021
4022#[cfg(test)]
4023mod test {
4024    use std::mem::size_of;
4025
4026    use stageleft::{QuotedWithContext, q};
4027
4028    use super::*;
4029
4030    #[test]
4031    #[cfg_attr(
4032        not(feature = "build"),
4033        ignore = "expects inclusion of feature-gated fields"
4034    )]
4035    fn hydro_node_size() {
4036        assert_eq!(size_of::<HydroNode>(), 248);
4037    }
4038
4039    #[test]
4040    #[cfg_attr(
4041        not(feature = "build"),
4042        ignore = "expects inclusion of feature-gated fields"
4043    )]
4044    fn hydro_root_size() {
4045        assert_eq!(size_of::<HydroRoot>(), 136);
4046    }
4047
4048    #[test]
4049    fn test_simplify_q_macro_basic() {
4050        // Test basic non-q! expression
4051        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4052        let result = simplify_q_macro(simple_expr.clone());
4053        assert_eq!(result, simple_expr);
4054    }
4055
4056    #[test]
4057    fn test_simplify_q_macro_actual_stageleft_call() {
4058        // Test a simplified version of what a real stageleft call might look like
4059        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4060        let result = simplify_q_macro(stageleft_call);
4061        // This should be processed by our visitor and simplified to q!(...)
4062        // since we detect the stageleft::runtime_support::fn_* pattern
4063        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4064    }
4065
4066    #[test]
4067    fn test_closure_no_pipe_at_start() {
4068        // Test a closure that does not start with a pipe
4069        let stageleft_call = q!({
4070            let foo = 123;
4071            move |b: usize| b + foo
4072        })
4073        .splice_fn1_ctx(&());
4074        let result = simplify_q_macro(stageleft_call);
4075        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4076    }
4077}