1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42 fn from(expr: syn::Expr) -> Self {
43 Self(Box::new(expr))
44 }
45}
46
47impl Deref for DebugExpr {
48 type Target = syn::Expr;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl ToTokens for DebugExpr {
56 fn to_tokens(&self, tokens: &mut TokenStream) {
57 self.0.to_tokens(tokens);
58 }
59}
60
61impl Debug for DebugExpr {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0.to_token_stream())
64 }
65}
66
67impl Display for DebugExpr {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let original = self.0.as_ref().clone();
70 let simplified = simplify_q_macro(original);
71
72 write!(f, "q!({})", quote::quote!(#simplified))
75 }
76}
77
78fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80 let mut simplifier = QMacroSimplifier::new();
83 simplifier.visit_expr_mut(&mut expr);
84
85 if let Some(simplified) = simplifier.simplified_result {
87 simplified
88 } else {
89 expr
90 }
91}
92
93#[derive(Default)]
95pub struct QMacroSimplifier {
96 pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105impl VisitMut for QMacroSimplifier {
106 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107 if self.simplified_result.is_some() {
109 return;
110 }
111
112 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113 && self.is_stageleft_runtime_support_call(&path_expr.path)
115 && let Some(closure) = self.extract_closure_from_args(&call.args)
117 {
118 self.simplified_result = Some(closure);
119 return;
120 }
121
122 syn::visit_mut::visit_expr_mut(self, expr);
125 }
126}
127
128impl QMacroSimplifier {
129 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130 if let Some(last_segment) = path.segments.last() {
132 let fn_name = last_segment.ident.to_string();
133 fn_name.contains("_type_hint")
135 && path.segments.len() > 2
136 && path.segments[0].ident == "stageleft"
137 && path.segments[1].ident == "runtime_support"
138 } else {
139 false
140 }
141 }
142
143 fn extract_closure_from_args(
144 &self,
145 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146 ) -> Option<syn::Expr> {
147 for arg in args {
149 if let syn::Expr::Closure(_) = arg {
150 return Some(arg.clone());
151 }
152 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154 return Some(closure_expr);
155 }
156 }
157 None
158 }
159
160 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161 let mut visitor = ClosureFinder {
162 found_closure: None,
163 prefer_inner_blocks: true,
164 };
165 visitor.visit_expr(expr);
166 visitor.found_closure
167 }
168}
169
170struct ClosureFinder {
172 found_closure: Option<syn::Expr>,
173 prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178 if self.found_closure.is_some() {
180 return;
181 }
182
183 match expr {
184 syn::Expr::Closure(_) => {
185 self.found_closure = Some(expr.clone());
186 }
187 syn::Expr::Block(block) if self.prefer_inner_blocks => {
188 for stmt in &block.block.stmts {
190 if let syn::Stmt::Expr(stmt_expr, _) = stmt
191 && let syn::Expr::Block(_) = stmt_expr
192 {
193 let mut inner_visitor = ClosureFinder {
195 found_closure: None,
196 prefer_inner_blocks: false, };
198 inner_visitor.visit_expr(stmt_expr);
199 if inner_visitor.found_closure.is_some() {
200 self.found_closure = Some(stmt_expr.clone());
202 return;
203 }
204 }
205 }
206
207 visit::visit_expr(self, expr);
209
210 if self.found_closure.is_some() {
213 }
215 }
216 _ => {
217 visit::visit_expr(self, expr);
219 }
220 }
221 }
222}
223
224#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231 fn from(t: syn::Type) -> Self {
232 Self(Box::new(t))
233 }
234}
235
236impl Deref for DebugType {
237 type Target = syn::Type;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0
241 }
242}
243
244impl ToTokens for DebugType {
245 fn to_tokens(&self, tokens: &mut TokenStream) {
246 self.0.to_tokens(tokens);
247 }
248}
249
250impl Debug for DebugType {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.to_token_stream())
253 }
254}
255
256pub enum DebugInstantiate {
257 Building,
258 Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262 not(feature = "build"),
263 expect(
264 dead_code,
265 reason = "sink, source unused without `feature = \"build\"`."
266 )
267)]
268pub struct DebugInstantiateFinalized {
269 sink: syn::Expr,
270 source: syn::Expr,
271 connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275 fn from(f: DebugInstantiateFinalized) -> Self {
276 Self::Finalized(Box::new(f))
277 }
278}
279
280impl Debug for DebugInstantiate {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "<network instantiate>")
283 }
284}
285
286impl Hash for DebugInstantiate {
287 fn hash<H: Hasher>(&self, _state: &mut H) {
288 }
290}
291
292impl Clone for DebugInstantiate {
293 fn clone(&self) -> Self {
294 match self {
295 DebugInstantiate::Building => DebugInstantiate::Building,
296 DebugInstantiate::Finalized(_) => {
297 panic!("DebugInstantiate::Finalized should not be cloned")
298 }
299 }
300 }
301}
302
303#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313 Uninit,
315 Stream(DebugExpr),
318 Tee(LocationId, LocationId),
322}
323
324#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327 Stream(DebugExpr),
328 ExternalNetwork(),
329 Iter(DebugExpr),
330 Spin(),
331 ClusterMembers(LocationId, ClusterMembersState),
332 Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336pub trait DfirBuilder {
342 fn singleton_intermediates(&self) -> bool;
344
345 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348 fn batch(
349 &mut self,
350 in_ident: syn::Ident,
351 in_location: &LocationId,
352 in_kind: &CollectionKind,
353 out_ident: &syn::Ident,
354 out_location: &LocationId,
355 op_meta: &HydroIrOpMetadata,
356 );
357 fn yield_from_tick(
358 &mut self,
359 in_ident: syn::Ident,
360 in_location: &LocationId,
361 in_kind: &CollectionKind,
362 out_ident: &syn::Ident,
363 out_location: &LocationId,
364 );
365
366 fn begin_atomic(
367 &mut self,
368 in_ident: syn::Ident,
369 in_location: &LocationId,
370 in_kind: &CollectionKind,
371 out_ident: &syn::Ident,
372 out_location: &LocationId,
373 op_meta: &HydroIrOpMetadata,
374 );
375 fn end_atomic(
376 &mut self,
377 in_ident: syn::Ident,
378 in_location: &LocationId,
379 in_kind: &CollectionKind,
380 out_ident: &syn::Ident,
381 );
382
383 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384 fn observe_nondet(
385 &mut self,
386 trusted: bool,
387 location: &LocationId,
388 in_ident: syn::Ident,
389 in_kind: &CollectionKind,
390 out_ident: &syn::Ident,
391 out_kind: &CollectionKind,
392 op_meta: &HydroIrOpMetadata,
393 );
394
395 #[expect(clippy::too_many_arguments, reason = "TODO")]
396 fn create_network(
397 &mut self,
398 from: &LocationId,
399 to: &LocationId,
400 input_ident: syn::Ident,
401 out_ident: &syn::Ident,
402 serialize: Option<&DebugExpr>,
403 sink: syn::Expr,
404 source: syn::Expr,
405 deserialize: Option<&DebugExpr>,
406 tag_id: usize,
407 );
408
409 fn create_external_source(
410 &mut self,
411 on: &LocationId,
412 source_expr: syn::Expr,
413 out_ident: &syn::Ident,
414 deserialize: Option<&DebugExpr>,
415 tag_id: usize,
416 );
417
418 fn create_external_output(
419 &mut self,
420 on: &LocationId,
421 sink_expr: syn::Expr,
422 input_ident: &syn::Ident,
423 serialize: Option<&DebugExpr>,
424 tag_id: usize,
425 );
426}
427
428#[cfg(feature = "build")]
429impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
430 fn singleton_intermediates(&self) -> bool {
431 false
432 }
433
434 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
435 self.entry(location.root().key())
436 .expect("location was removed")
437 .or_default()
438 }
439
440 fn batch(
441 &mut self,
442 in_ident: syn::Ident,
443 in_location: &LocationId,
444 in_kind: &CollectionKind,
445 out_ident: &syn::Ident,
446 _out_location: &LocationId,
447 _op_meta: &HydroIrOpMetadata,
448 ) {
449 let builder = self.get_dfir_mut(in_location.root());
450 if in_kind.is_bounded()
451 && matches!(
452 in_kind,
453 CollectionKind::Singleton { .. }
454 | CollectionKind::Optional { .. }
455 | CollectionKind::KeyedSingleton { .. }
456 )
457 {
458 assert!(in_location.is_top_level());
459 builder.add_dfir(
460 parse_quote! {
461 #out_ident = #in_ident -> persist::<'static>();
462 },
463 None,
464 None,
465 );
466 } else {
467 builder.add_dfir(
468 parse_quote! {
469 #out_ident = #in_ident;
470 },
471 None,
472 None,
473 );
474 }
475 }
476
477 fn yield_from_tick(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 _in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 _out_location: &LocationId,
484 ) {
485 let builder = self.get_dfir_mut(in_location.root());
486 builder.add_dfir(
487 parse_quote! {
488 #out_ident = #in_ident;
489 },
490 None,
491 None,
492 );
493 }
494
495 fn begin_atomic(
496 &mut self,
497 in_ident: syn::Ident,
498 in_location: &LocationId,
499 _in_kind: &CollectionKind,
500 out_ident: &syn::Ident,
501 _out_location: &LocationId,
502 _op_meta: &HydroIrOpMetadata,
503 ) {
504 let builder = self.get_dfir_mut(in_location.root());
505 builder.add_dfir(
506 parse_quote! {
507 #out_ident = #in_ident;
508 },
509 None,
510 None,
511 );
512 }
513
514 fn end_atomic(
515 &mut self,
516 in_ident: syn::Ident,
517 in_location: &LocationId,
518 _in_kind: &CollectionKind,
519 out_ident: &syn::Ident,
520 ) {
521 let builder = self.get_dfir_mut(in_location.root());
522 builder.add_dfir(
523 parse_quote! {
524 #out_ident = #in_ident;
525 },
526 None,
527 None,
528 );
529 }
530
531 fn observe_nondet(
532 &mut self,
533 _trusted: bool,
534 location: &LocationId,
535 in_ident: syn::Ident,
536 _in_kind: &CollectionKind,
537 out_ident: &syn::Ident,
538 _out_kind: &CollectionKind,
539 _op_meta: &HydroIrOpMetadata,
540 ) {
541 let builder = self.get_dfir_mut(location);
542 builder.add_dfir(
543 parse_quote! {
544 #out_ident = #in_ident;
545 },
546 None,
547 None,
548 );
549 }
550
551 fn create_network(
552 &mut self,
553 from: &LocationId,
554 to: &LocationId,
555 input_ident: syn::Ident,
556 out_ident: &syn::Ident,
557 serialize: Option<&DebugExpr>,
558 sink: syn::Expr,
559 source: syn::Expr,
560 deserialize: Option<&DebugExpr>,
561 tag_id: usize,
562 ) {
563 let sender_builder = self.get_dfir_mut(from);
564 if let Some(serialize_pipeline) = serialize {
565 sender_builder.add_dfir(
566 parse_quote! {
567 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
568 },
569 None,
570 Some(&format!("send{}", tag_id)),
572 );
573 } else {
574 sender_builder.add_dfir(
575 parse_quote! {
576 #input_ident -> dest_sink(#sink);
577 },
578 None,
579 Some(&format!("send{}", tag_id)),
580 );
581 }
582
583 let receiver_builder = self.get_dfir_mut(to);
584 if let Some(deserialize_pipeline) = deserialize {
585 receiver_builder.add_dfir(
586 parse_quote! {
587 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
588 },
589 None,
590 Some(&format!("recv{}", tag_id)),
591 );
592 } else {
593 receiver_builder.add_dfir(
594 parse_quote! {
595 #out_ident = source_stream(#source);
596 },
597 None,
598 Some(&format!("recv{}", tag_id)),
599 );
600 }
601 }
602
603 fn create_external_source(
604 &mut self,
605 on: &LocationId,
606 source_expr: syn::Expr,
607 out_ident: &syn::Ident,
608 deserialize: Option<&DebugExpr>,
609 tag_id: usize,
610 ) {
611 let receiver_builder = self.get_dfir_mut(on);
612 if let Some(deserialize_pipeline) = deserialize {
613 receiver_builder.add_dfir(
614 parse_quote! {
615 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
616 },
617 None,
618 Some(&format!("recv{}", tag_id)),
619 );
620 } else {
621 receiver_builder.add_dfir(
622 parse_quote! {
623 #out_ident = source_stream(#source_expr);
624 },
625 None,
626 Some(&format!("recv{}", tag_id)),
627 );
628 }
629 }
630
631 fn create_external_output(
632 &mut self,
633 on: &LocationId,
634 sink_expr: syn::Expr,
635 input_ident: &syn::Ident,
636 serialize: Option<&DebugExpr>,
637 tag_id: usize,
638 ) {
639 let sender_builder = self.get_dfir_mut(on);
640 if let Some(serialize_fn) = serialize {
641 sender_builder.add_dfir(
642 parse_quote! {
643 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
644 },
645 None,
646 Some(&format!("send{}", tag_id)),
648 );
649 } else {
650 sender_builder.add_dfir(
651 parse_quote! {
652 #input_ident -> dest_sink(#sink_expr);
653 },
654 None,
655 Some(&format!("send{}", tag_id)),
656 );
657 }
658 }
659}
660
661#[cfg(feature = "build")]
662pub enum BuildersOrCallback<'a, L, N>
663where
664 L: FnMut(&mut HydroRoot, &mut usize),
665 N: FnMut(&mut HydroNode, &mut usize),
666{
667 Builders(&'a mut dyn DfirBuilder),
668 Callback(L, N),
669}
670
671#[derive(Debug, Hash)]
675pub enum HydroRoot {
676 ForEach {
677 f: DebugExpr,
678 input: Box<HydroNode>,
679 op_metadata: HydroIrOpMetadata,
680 },
681 SendExternal {
682 to_external_key: LocationKey,
683 to_port_id: ExternalPortId,
684 to_many: bool,
685 unpaired: bool,
686 serialize_fn: Option<DebugExpr>,
687 instantiate_fn: DebugInstantiate,
688 input: Box<HydroNode>,
689 op_metadata: HydroIrOpMetadata,
690 },
691 DestSink {
692 sink: DebugExpr,
693 input: Box<HydroNode>,
694 op_metadata: HydroIrOpMetadata,
695 },
696 CycleSink {
697 cycle_id: CycleId,
698 input: Box<HydroNode>,
699 op_metadata: HydroIrOpMetadata,
700 },
701 EmbeddedOutput {
702 ident: syn::Ident,
703 input: Box<HydroNode>,
704 op_metadata: HydroIrOpMetadata,
705 },
706}
707
708impl HydroRoot {
709 #[cfg(feature = "build")]
710 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
711 pub fn compile_network<'a, D>(
712 &mut self,
713 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
714 seen_tees: &mut SeenTees,
715 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
716 processes: &SparseSecondaryMap<LocationKey, D::Process>,
717 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
718 externals: &SparseSecondaryMap<LocationKey, D::External>,
719 env: &mut D::InstantiateEnv,
720 ) where
721 D: Deploy<'a>,
722 {
723 let refcell_extra_stmts = RefCell::new(extra_stmts);
724 let refcell_env = RefCell::new(env);
725 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
726 self.transform_bottom_up(
727 &mut |l| {
728 if let HydroRoot::SendExternal {
729 input,
730 to_external_key,
731 to_port_id,
732 to_many,
733 unpaired,
734 instantiate_fn,
735 ..
736 } = l
737 {
738 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
739 DebugInstantiate::Building => {
740 let to_node = externals
741 .get(*to_external_key)
742 .unwrap_or_else(|| {
743 panic!("A external used in the graph was not instantiated: {}", to_external_key)
744 })
745 .clone();
746
747 match input.metadata().location_id.root() {
748 &LocationId::Process(process_key) => {
749 if *to_many {
750 (
751 (
752 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
753 parse_quote!(DUMMY),
754 ),
755 Box::new(|| {}) as Box<dyn FnOnce()>,
756 )
757 } else {
758 let from_node = processes
759 .get(process_key)
760 .unwrap_or_else(|| {
761 panic!("A process used in the graph was not instantiated: {}", process_key)
762 })
763 .clone();
764
765 let sink_port = from_node.next_port();
766 let source_port = to_node.next_port();
767
768 if *unpaired {
769 use stageleft::quote_type;
770 use tokio_util::codec::LengthDelimitedCodec;
771
772 to_node.register(*to_port_id, source_port.clone());
773
774 let _ = D::e2o_source(
775 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
776 &to_node, &source_port,
777 &from_node, &sink_port,
778 "e_type::<LengthDelimitedCodec>(),
779 format!("{}_{}", *to_external_key, *to_port_id)
780 );
781 }
782
783 (
784 (
785 D::o2e_sink(
786 &from_node,
787 &sink_port,
788 &to_node,
789 &source_port,
790 format!("{}_{}", *to_external_key, *to_port_id)
791 ),
792 parse_quote!(DUMMY),
793 ),
794 if *unpaired {
795 D::e2o_connect(
796 &to_node,
797 &source_port,
798 &from_node,
799 &sink_port,
800 *to_many,
801 NetworkHint::Auto,
802 )
803 } else {
804 Box::new(|| {}) as Box<dyn FnOnce()>
805 },
806 )
807 }
808 }
809 LocationId::Cluster(_) => todo!(),
810 _ => panic!()
811 }
812 },
813
814 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
815 };
816
817 *instantiate_fn = DebugInstantiateFinalized {
818 sink: sink_expr,
819 source: source_expr,
820 connect_fn: Some(connect_fn),
821 }
822 .into();
823 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
824 let element_type = match &input.metadata().collection_kind {
825 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
826 _ => panic!("Embedded output must have Stream collection kind"),
827 };
828 let location_key = match input.metadata().location_id.root() {
829 LocationId::Process(key) | LocationId::Cluster(key) => *key,
830 _ => panic!("Embedded output must be on a process or cluster"),
831 };
832 D::register_embedded_output(
833 &mut refcell_env.borrow_mut(),
834 location_key,
835 ident,
836 &element_type,
837 );
838 }
839 },
840 &mut |n| {
841 if let HydroNode::Network {
842 name,
843 input,
844 instantiate_fn,
845 metadata,
846 ..
847 } = n
848 {
849 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
850 DebugInstantiate::Building => instantiate_network::<D>(
851 &mut refcell_env.borrow_mut(),
852 input.metadata().location_id.root(),
853 metadata.location_id.root(),
854 processes,
855 clusters,
856 name.as_deref(),
857 ),
858
859 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
860 };
861
862 *instantiate_fn = DebugInstantiateFinalized {
863 sink: sink_expr,
864 source: source_expr,
865 connect_fn: Some(connect_fn),
866 }
867 .into();
868 } else if let HydroNode::ExternalInput {
869 from_external_key,
870 from_port_id,
871 from_many,
872 codec_type,
873 port_hint,
874 instantiate_fn,
875 metadata,
876 ..
877 } = n
878 {
879 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
880 DebugInstantiate::Building => {
881 let from_node = externals
882 .get(*from_external_key)
883 .unwrap_or_else(|| {
884 panic!(
885 "A external used in the graph was not instantiated: {}",
886 from_external_key,
887 )
888 })
889 .clone();
890
891 match metadata.location_id.root() {
892 &LocationId::Process(process_key) => {
893 let to_node = processes
894 .get(process_key)
895 .unwrap_or_else(|| {
896 panic!("A process used in the graph was not instantiated: {}", process_key)
897 })
898 .clone();
899
900 let sink_port = from_node.next_port();
901 let source_port = to_node.next_port();
902
903 from_node.register(*from_port_id, sink_port.clone());
904
905 (
906 (
907 parse_quote!(DUMMY),
908 if *from_many {
909 D::e2o_many_source(
910 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
911 &to_node, &source_port,
912 codec_type.0.as_ref(),
913 format!("{}_{}", *from_external_key, *from_port_id)
914 )
915 } else {
916 D::e2o_source(
917 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
918 &from_node, &sink_port,
919 &to_node, &source_port,
920 codec_type.0.as_ref(),
921 format!("{}_{}", *from_external_key, *from_port_id)
922 )
923 },
924 ),
925 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
926 )
927 }
928 LocationId::Cluster(_) => todo!(),
929 _ => panic!()
930 }
931 },
932
933 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
934 };
935
936 *instantiate_fn = DebugInstantiateFinalized {
937 sink: sink_expr,
938 source: source_expr,
939 connect_fn: Some(connect_fn),
940 }
941 .into();
942 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
943 let element_type = match &metadata.collection_kind {
944 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
945 _ => panic!("Embedded source must have Stream collection kind"),
946 };
947 let location_key = match metadata.location_id.root() {
948 LocationId::Process(key) | LocationId::Cluster(key) => *key,
949 _ => panic!("Embedded source must be on a process or cluster"),
950 };
951 D::register_embedded_input(
952 &mut refcell_env.borrow_mut(),
953 location_key,
954 ident,
955 &element_type,
956 );
957 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
958 match state {
959 ClusterMembersState::Uninit => {
960 let at_location = metadata.location_id.root().clone();
961 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
962 if refcell_seen_cluster_members.borrow_mut().insert(key) {
963 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
965 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
966 &(),
967 );
968 *state = ClusterMembersState::Stream(expr.into());
969 } else {
970 *state = ClusterMembersState::Tee(at_location, location_id.clone());
972 }
973 }
974 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
975 panic!("cluster members already finalized");
976 }
977 }
978 }
979 },
980 seen_tees,
981 false,
982 );
983 }
984
985 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
986 self.transform_bottom_up(
987 &mut |l| {
988 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
989 match instantiate_fn {
990 DebugInstantiate::Building => panic!("network not built"),
991
992 DebugInstantiate::Finalized(finalized) => {
993 (finalized.connect_fn.take().unwrap())();
994 }
995 }
996 }
997 },
998 &mut |n| {
999 if let HydroNode::Network { instantiate_fn, .. }
1000 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1001 {
1002 match instantiate_fn {
1003 DebugInstantiate::Building => panic!("network not built"),
1004
1005 DebugInstantiate::Finalized(finalized) => {
1006 (finalized.connect_fn.take().unwrap())();
1007 }
1008 }
1009 }
1010 },
1011 seen_tees,
1012 false,
1013 );
1014 }
1015
1016 pub fn transform_bottom_up(
1017 &mut self,
1018 transform_root: &mut impl FnMut(&mut HydroRoot),
1019 transform_node: &mut impl FnMut(&mut HydroNode),
1020 seen_tees: &mut SeenTees,
1021 check_well_formed: bool,
1022 ) {
1023 self.transform_children(
1024 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1025 seen_tees,
1026 );
1027
1028 transform_root(self);
1029 }
1030
1031 pub fn transform_children(
1032 &mut self,
1033 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1034 seen_tees: &mut SeenTees,
1035 ) {
1036 match self {
1037 HydroRoot::ForEach { input, .. }
1038 | HydroRoot::SendExternal { input, .. }
1039 | HydroRoot::DestSink { input, .. }
1040 | HydroRoot::CycleSink { input, .. }
1041 | HydroRoot::EmbeddedOutput { input, .. } => {
1042 transform(input, seen_tees);
1043 }
1044 }
1045 }
1046
1047 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1048 match self {
1049 HydroRoot::ForEach {
1050 f,
1051 input,
1052 op_metadata,
1053 } => HydroRoot::ForEach {
1054 f: f.clone(),
1055 input: Box::new(input.deep_clone(seen_tees)),
1056 op_metadata: op_metadata.clone(),
1057 },
1058 HydroRoot::SendExternal {
1059 to_external_key,
1060 to_port_id,
1061 to_many,
1062 unpaired,
1063 serialize_fn,
1064 instantiate_fn,
1065 input,
1066 op_metadata,
1067 } => HydroRoot::SendExternal {
1068 to_external_key: *to_external_key,
1069 to_port_id: *to_port_id,
1070 to_many: *to_many,
1071 unpaired: *unpaired,
1072 serialize_fn: serialize_fn.clone(),
1073 instantiate_fn: instantiate_fn.clone(),
1074 input: Box::new(input.deep_clone(seen_tees)),
1075 op_metadata: op_metadata.clone(),
1076 },
1077 HydroRoot::DestSink {
1078 sink,
1079 input,
1080 op_metadata,
1081 } => HydroRoot::DestSink {
1082 sink: sink.clone(),
1083 input: Box::new(input.deep_clone(seen_tees)),
1084 op_metadata: op_metadata.clone(),
1085 },
1086 HydroRoot::CycleSink {
1087 cycle_id,
1088 input,
1089 op_metadata,
1090 } => HydroRoot::CycleSink {
1091 cycle_id: *cycle_id,
1092 input: Box::new(input.deep_clone(seen_tees)),
1093 op_metadata: op_metadata.clone(),
1094 },
1095 HydroRoot::EmbeddedOutput {
1096 ident,
1097 input,
1098 op_metadata,
1099 } => HydroRoot::EmbeddedOutput {
1100 ident: ident.clone(),
1101 input: Box::new(input.deep_clone(seen_tees)),
1102 op_metadata: op_metadata.clone(),
1103 },
1104 }
1105 }
1106
1107 #[cfg(feature = "build")]
1108 pub fn emit(
1109 &mut self,
1110 graph_builders: &mut dyn DfirBuilder,
1111 seen_tees: &mut SeenTees,
1112 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1113 next_stmt_id: &mut usize,
1114 ) {
1115 self.emit_core(
1116 &mut BuildersOrCallback::<
1117 fn(&mut HydroRoot, &mut usize),
1118 fn(&mut HydroNode, &mut usize),
1119 >::Builders(graph_builders),
1120 seen_tees,
1121 built_tees,
1122 next_stmt_id,
1123 );
1124 }
1125
1126 #[cfg(feature = "build")]
1127 pub fn emit_core(
1128 &mut self,
1129 builders_or_callback: &mut BuildersOrCallback<
1130 impl FnMut(&mut HydroRoot, &mut usize),
1131 impl FnMut(&mut HydroNode, &mut usize),
1132 >,
1133 seen_tees: &mut SeenTees,
1134 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1135 next_stmt_id: &mut usize,
1136 ) {
1137 match self {
1138 HydroRoot::ForEach { f, input, .. } => {
1139 let input_ident =
1140 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1141
1142 match builders_or_callback {
1143 BuildersOrCallback::Builders(graph_builders) => {
1144 graph_builders
1145 .get_dfir_mut(&input.metadata().location_id)
1146 .add_dfir(
1147 parse_quote! {
1148 #input_ident -> for_each(#f);
1149 },
1150 None,
1151 Some(&next_stmt_id.to_string()),
1152 );
1153 }
1154 BuildersOrCallback::Callback(leaf_callback, _) => {
1155 leaf_callback(self, next_stmt_id);
1156 }
1157 }
1158
1159 *next_stmt_id += 1;
1160 }
1161
1162 HydroRoot::SendExternal {
1163 serialize_fn,
1164 instantiate_fn,
1165 input,
1166 ..
1167 } => {
1168 let input_ident =
1169 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1170
1171 match builders_or_callback {
1172 BuildersOrCallback::Builders(graph_builders) => {
1173 let (sink_expr, _) = match instantiate_fn {
1174 DebugInstantiate::Building => (
1175 syn::parse_quote!(DUMMY_SINK),
1176 syn::parse_quote!(DUMMY_SOURCE),
1177 ),
1178
1179 DebugInstantiate::Finalized(finalized) => {
1180 (finalized.sink.clone(), finalized.source.clone())
1181 }
1182 };
1183
1184 graph_builders.create_external_output(
1185 &input.metadata().location_id,
1186 sink_expr,
1187 &input_ident,
1188 serialize_fn.as_ref(),
1189 *next_stmt_id,
1190 );
1191 }
1192 BuildersOrCallback::Callback(leaf_callback, _) => {
1193 leaf_callback(self, next_stmt_id);
1194 }
1195 }
1196
1197 *next_stmt_id += 1;
1198 }
1199
1200 HydroRoot::DestSink { sink, input, .. } => {
1201 let input_ident =
1202 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1203
1204 match builders_or_callback {
1205 BuildersOrCallback::Builders(graph_builders) => {
1206 graph_builders
1207 .get_dfir_mut(&input.metadata().location_id)
1208 .add_dfir(
1209 parse_quote! {
1210 #input_ident -> dest_sink(#sink);
1211 },
1212 None,
1213 Some(&next_stmt_id.to_string()),
1214 );
1215 }
1216 BuildersOrCallback::Callback(leaf_callback, _) => {
1217 leaf_callback(self, next_stmt_id);
1218 }
1219 }
1220
1221 *next_stmt_id += 1;
1222 }
1223
1224 HydroRoot::CycleSink {
1225 cycle_id, input, ..
1226 } => {
1227 let input_ident =
1228 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1229
1230 match builders_or_callback {
1231 BuildersOrCallback::Builders(graph_builders) => {
1232 let elem_type: syn::Type = match &input.metadata().collection_kind {
1233 CollectionKind::KeyedSingleton {
1234 key_type,
1235 value_type,
1236 ..
1237 }
1238 | CollectionKind::KeyedStream {
1239 key_type,
1240 value_type,
1241 ..
1242 } => {
1243 parse_quote!((#key_type, #value_type))
1244 }
1245 CollectionKind::Stream { element_type, .. }
1246 | CollectionKind::Singleton { element_type, .. }
1247 | CollectionKind::Optional { element_type, .. } => {
1248 parse_quote!(#element_type)
1249 }
1250 };
1251
1252 let cycle_id_ident = cycle_id.as_ident();
1253 graph_builders
1254 .get_dfir_mut(&input.metadata().location_id)
1255 .add_dfir(
1256 parse_quote! {
1257 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1258 },
1259 None,
1260 None,
1261 );
1262 }
1263 BuildersOrCallback::Callback(_, _) => {}
1265 }
1266 }
1267
1268 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1269 let input_ident =
1270 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1271
1272 match builders_or_callback {
1273 BuildersOrCallback::Builders(graph_builders) => {
1274 graph_builders
1275 .get_dfir_mut(&input.metadata().location_id)
1276 .add_dfir(
1277 parse_quote! {
1278 #input_ident -> for_each(&mut #ident);
1279 },
1280 None,
1281 Some(&next_stmt_id.to_string()),
1282 );
1283 }
1284 BuildersOrCallback::Callback(leaf_callback, _) => {
1285 leaf_callback(self, next_stmt_id);
1286 }
1287 }
1288
1289 *next_stmt_id += 1;
1290 }
1291 }
1292 }
1293
1294 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1295 match self {
1296 HydroRoot::ForEach { op_metadata, .. }
1297 | HydroRoot::SendExternal { op_metadata, .. }
1298 | HydroRoot::DestSink { op_metadata, .. }
1299 | HydroRoot::CycleSink { op_metadata, .. }
1300 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1301 }
1302 }
1303
1304 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1305 match self {
1306 HydroRoot::ForEach { op_metadata, .. }
1307 | HydroRoot::SendExternal { op_metadata, .. }
1308 | HydroRoot::DestSink { op_metadata, .. }
1309 | HydroRoot::CycleSink { op_metadata, .. }
1310 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1311 }
1312 }
1313
1314 pub fn input(&self) -> &HydroNode {
1315 match self {
1316 HydroRoot::ForEach { input, .. }
1317 | HydroRoot::SendExternal { input, .. }
1318 | HydroRoot::DestSink { input, .. }
1319 | HydroRoot::CycleSink { input, .. }
1320 | HydroRoot::EmbeddedOutput { input, .. } => input,
1321 }
1322 }
1323
1324 pub fn input_metadata(&self) -> &HydroIrMetadata {
1325 self.input().metadata()
1326 }
1327
1328 pub fn print_root(&self) -> String {
1329 match self {
1330 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1331 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1332 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1333 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1334 HydroRoot::EmbeddedOutput { ident, .. } => {
1335 format!("EmbeddedOutput({})", ident)
1336 }
1337 }
1338 }
1339
1340 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1341 match self {
1342 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1343 transform(f);
1344 }
1345 HydroRoot::SendExternal { .. }
1346 | HydroRoot::CycleSink { .. }
1347 | HydroRoot::EmbeddedOutput { .. } => {}
1348 }
1349 }
1350}
1351
1352#[cfg(feature = "build")]
1353pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1354 let mut builders = SecondaryMap::new();
1355 let mut seen_tees = HashMap::new();
1356 let mut built_tees = HashMap::new();
1357 let mut next_stmt_id = 0;
1358 for leaf in ir {
1359 leaf.emit(
1360 &mut builders,
1361 &mut seen_tees,
1362 &mut built_tees,
1363 &mut next_stmt_id,
1364 );
1365 }
1366 builders
1367}
1368
1369#[cfg(feature = "build")]
1370pub fn traverse_dfir(
1371 ir: &mut [HydroRoot],
1372 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1373 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1374) {
1375 let mut seen_tees = HashMap::new();
1376 let mut built_tees = HashMap::new();
1377 let mut next_stmt_id = 0;
1378 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1379 ir.iter_mut().for_each(|leaf| {
1380 leaf.emit_core(
1381 &mut callback,
1382 &mut seen_tees,
1383 &mut built_tees,
1384 &mut next_stmt_id,
1385 );
1386 });
1387}
1388
1389pub fn transform_bottom_up(
1390 ir: &mut [HydroRoot],
1391 transform_root: &mut impl FnMut(&mut HydroRoot),
1392 transform_node: &mut impl FnMut(&mut HydroNode),
1393 check_well_formed: bool,
1394) {
1395 let mut seen_tees = HashMap::new();
1396 ir.iter_mut().for_each(|leaf| {
1397 leaf.transform_bottom_up(
1398 transform_root,
1399 transform_node,
1400 &mut seen_tees,
1401 check_well_formed,
1402 );
1403 });
1404}
1405
1406pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1407 let mut seen_tees = HashMap::new();
1408 ir.iter()
1409 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1410 .collect()
1411}
1412
1413type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1414thread_local! {
1415 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1416}
1417
1418pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1419 PRINTED_TEES.with(|printed_tees| {
1420 let mut printed_tees_mut = printed_tees.borrow_mut();
1421 *printed_tees_mut = Some((0, HashMap::new()));
1422 drop(printed_tees_mut);
1423
1424 let ret = f();
1425
1426 let mut printed_tees_mut = printed_tees.borrow_mut();
1427 *printed_tees_mut = None;
1428
1429 ret
1430 })
1431}
1432
1433pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1434
1435impl TeeNode {
1436 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1437 Rc::as_ptr(&self.0)
1438 }
1439}
1440
1441impl Debug for TeeNode {
1442 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1443 PRINTED_TEES.with(|printed_tees| {
1444 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1445 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1446
1447 if let Some(printed_tees_mut) = printed_tees_mut {
1448 if let Some(existing) = printed_tees_mut
1449 .1
1450 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1451 {
1452 write!(f, "<tee {}>", existing)
1453 } else {
1454 let next_id = printed_tees_mut.0;
1455 printed_tees_mut.0 += 1;
1456 printed_tees_mut
1457 .1
1458 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1459 drop(printed_tees_mut_borrow);
1460 write!(f, "<tee {}>: ", next_id)?;
1461 Debug::fmt(&self.0.borrow(), f)
1462 }
1463 } else {
1464 drop(printed_tees_mut_borrow);
1465 write!(f, "<tee>: ")?;
1466 Debug::fmt(&self.0.borrow(), f)
1467 }
1468 })
1469 }
1470}
1471
1472impl Hash for TeeNode {
1473 fn hash<H: Hasher>(&self, state: &mut H) {
1474 self.0.borrow_mut().hash(state);
1475 }
1476}
1477
1478#[derive(Clone, PartialEq, Eq, Debug)]
1479pub enum BoundKind {
1480 Unbounded,
1481 Bounded,
1482}
1483
1484#[derive(Clone, PartialEq, Eq, Debug)]
1485pub enum StreamOrder {
1486 NoOrder,
1487 TotalOrder,
1488}
1489
1490#[derive(Clone, PartialEq, Eq, Debug)]
1491pub enum StreamRetry {
1492 AtLeastOnce,
1493 ExactlyOnce,
1494}
1495
1496#[derive(Clone, PartialEq, Eq, Debug)]
1497pub enum KeyedSingletonBoundKind {
1498 Unbounded,
1499 BoundedValue,
1500 Bounded,
1501}
1502
1503#[derive(Clone, PartialEq, Eq, Debug)]
1504pub enum CollectionKind {
1505 Stream {
1506 bound: BoundKind,
1507 order: StreamOrder,
1508 retry: StreamRetry,
1509 element_type: DebugType,
1510 },
1511 Singleton {
1512 bound: BoundKind,
1513 element_type: DebugType,
1514 },
1515 Optional {
1516 bound: BoundKind,
1517 element_type: DebugType,
1518 },
1519 KeyedStream {
1520 bound: BoundKind,
1521 value_order: StreamOrder,
1522 value_retry: StreamRetry,
1523 key_type: DebugType,
1524 value_type: DebugType,
1525 },
1526 KeyedSingleton {
1527 bound: KeyedSingletonBoundKind,
1528 key_type: DebugType,
1529 value_type: DebugType,
1530 },
1531}
1532
1533impl CollectionKind {
1534 pub fn is_bounded(&self) -> bool {
1535 matches!(
1536 self,
1537 CollectionKind::Stream {
1538 bound: BoundKind::Bounded,
1539 ..
1540 } | CollectionKind::Singleton {
1541 bound: BoundKind::Bounded,
1542 ..
1543 } | CollectionKind::Optional {
1544 bound: BoundKind::Bounded,
1545 ..
1546 } | CollectionKind::KeyedStream {
1547 bound: BoundKind::Bounded,
1548 ..
1549 } | CollectionKind::KeyedSingleton {
1550 bound: KeyedSingletonBoundKind::Bounded,
1551 ..
1552 }
1553 )
1554 }
1555}
1556
1557#[derive(Clone)]
1558pub struct HydroIrMetadata {
1559 pub location_id: LocationId,
1560 pub collection_kind: CollectionKind,
1561 pub cardinality: Option<usize>,
1562 pub tag: Option<String>,
1563 pub op: HydroIrOpMetadata,
1564}
1565
1566impl Hash for HydroIrMetadata {
1568 fn hash<H: Hasher>(&self, _: &mut H) {}
1569}
1570
1571impl PartialEq for HydroIrMetadata {
1572 fn eq(&self, _: &Self) -> bool {
1573 true
1574 }
1575}
1576
1577impl Eq for HydroIrMetadata {}
1578
1579impl Debug for HydroIrMetadata {
1580 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1581 f.debug_struct("HydroIrMetadata")
1582 .field("location_id", &self.location_id)
1583 .field("collection_kind", &self.collection_kind)
1584 .finish()
1585 }
1586}
1587
1588#[derive(Clone)]
1591pub struct HydroIrOpMetadata {
1592 pub backtrace: Backtrace,
1593 pub cpu_usage: Option<f64>,
1594 pub network_recv_cpu_usage: Option<f64>,
1595 pub id: Option<usize>,
1596}
1597
1598impl HydroIrOpMetadata {
1599 #[expect(
1600 clippy::new_without_default,
1601 reason = "explicit calls to new ensure correct backtrace bounds"
1602 )]
1603 pub fn new() -> HydroIrOpMetadata {
1604 Self::new_with_skip(1)
1605 }
1606
1607 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1608 HydroIrOpMetadata {
1609 backtrace: Backtrace::get_backtrace(2 + skip_count),
1610 cpu_usage: None,
1611 network_recv_cpu_usage: None,
1612 id: None,
1613 }
1614 }
1615}
1616
1617impl Debug for HydroIrOpMetadata {
1618 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1619 f.debug_struct("HydroIrOpMetadata").finish()
1620 }
1621}
1622
1623impl Hash for HydroIrOpMetadata {
1624 fn hash<H: Hasher>(&self, _: &mut H) {}
1625}
1626
1627#[derive(Debug, Hash)]
1630pub enum HydroNode {
1631 Placeholder,
1632
1633 Cast {
1641 inner: Box<HydroNode>,
1642 metadata: HydroIrMetadata,
1643 },
1644
1645 ObserveNonDet {
1651 inner: Box<HydroNode>,
1652 trusted: bool, metadata: HydroIrMetadata,
1654 },
1655
1656 Source {
1657 source: HydroSource,
1658 metadata: HydroIrMetadata,
1659 },
1660
1661 SingletonSource {
1662 value: DebugExpr,
1663 metadata: HydroIrMetadata,
1664 },
1665
1666 CycleSource {
1667 cycle_id: CycleId,
1668 metadata: HydroIrMetadata,
1669 },
1670
1671 Tee {
1672 inner: TeeNode,
1673 metadata: HydroIrMetadata,
1674 },
1675
1676 BeginAtomic {
1677 inner: Box<HydroNode>,
1678 metadata: HydroIrMetadata,
1679 },
1680
1681 EndAtomic {
1682 inner: Box<HydroNode>,
1683 metadata: HydroIrMetadata,
1684 },
1685
1686 Batch {
1687 inner: Box<HydroNode>,
1688 metadata: HydroIrMetadata,
1689 },
1690
1691 YieldConcat {
1692 inner: Box<HydroNode>,
1693 metadata: HydroIrMetadata,
1694 },
1695
1696 Chain {
1697 first: Box<HydroNode>,
1698 second: Box<HydroNode>,
1699 metadata: HydroIrMetadata,
1700 },
1701
1702 ChainFirst {
1703 first: Box<HydroNode>,
1704 second: Box<HydroNode>,
1705 metadata: HydroIrMetadata,
1706 },
1707
1708 CrossProduct {
1709 left: Box<HydroNode>,
1710 right: Box<HydroNode>,
1711 metadata: HydroIrMetadata,
1712 },
1713
1714 CrossSingleton {
1715 left: Box<HydroNode>,
1716 right: Box<HydroNode>,
1717 metadata: HydroIrMetadata,
1718 },
1719
1720 Join {
1721 left: Box<HydroNode>,
1722 right: Box<HydroNode>,
1723 metadata: HydroIrMetadata,
1724 },
1725
1726 Difference {
1727 pos: Box<HydroNode>,
1728 neg: Box<HydroNode>,
1729 metadata: HydroIrMetadata,
1730 },
1731
1732 AntiJoin {
1733 pos: Box<HydroNode>,
1734 neg: Box<HydroNode>,
1735 metadata: HydroIrMetadata,
1736 },
1737
1738 ResolveFutures {
1739 input: Box<HydroNode>,
1740 metadata: HydroIrMetadata,
1741 },
1742 ResolveFuturesOrdered {
1743 input: Box<HydroNode>,
1744 metadata: HydroIrMetadata,
1745 },
1746
1747 Map {
1748 f: DebugExpr,
1749 input: Box<HydroNode>,
1750 metadata: HydroIrMetadata,
1751 },
1752 FlatMap {
1753 f: DebugExpr,
1754 input: Box<HydroNode>,
1755 metadata: HydroIrMetadata,
1756 },
1757 Filter {
1758 f: DebugExpr,
1759 input: Box<HydroNode>,
1760 metadata: HydroIrMetadata,
1761 },
1762 FilterMap {
1763 f: DebugExpr,
1764 input: Box<HydroNode>,
1765 metadata: HydroIrMetadata,
1766 },
1767
1768 DeferTick {
1769 input: Box<HydroNode>,
1770 metadata: HydroIrMetadata,
1771 },
1772 Enumerate {
1773 input: Box<HydroNode>,
1774 metadata: HydroIrMetadata,
1775 },
1776 Inspect {
1777 f: DebugExpr,
1778 input: Box<HydroNode>,
1779 metadata: HydroIrMetadata,
1780 },
1781
1782 Unique {
1783 input: Box<HydroNode>,
1784 metadata: HydroIrMetadata,
1785 },
1786
1787 Sort {
1788 input: Box<HydroNode>,
1789 metadata: HydroIrMetadata,
1790 },
1791 Fold {
1792 init: DebugExpr,
1793 acc: DebugExpr,
1794 input: Box<HydroNode>,
1795 metadata: HydroIrMetadata,
1796 },
1797
1798 Scan {
1799 init: DebugExpr,
1800 acc: DebugExpr,
1801 input: Box<HydroNode>,
1802 metadata: HydroIrMetadata,
1803 },
1804 FoldKeyed {
1805 init: DebugExpr,
1806 acc: DebugExpr,
1807 input: Box<HydroNode>,
1808 metadata: HydroIrMetadata,
1809 },
1810
1811 Reduce {
1812 f: DebugExpr,
1813 input: Box<HydroNode>,
1814 metadata: HydroIrMetadata,
1815 },
1816 ReduceKeyed {
1817 f: DebugExpr,
1818 input: Box<HydroNode>,
1819 metadata: HydroIrMetadata,
1820 },
1821 ReduceKeyedWatermark {
1822 f: DebugExpr,
1823 input: Box<HydroNode>,
1824 watermark: Box<HydroNode>,
1825 metadata: HydroIrMetadata,
1826 },
1827
1828 Network {
1829 name: Option<String>,
1830 serialize_fn: Option<DebugExpr>,
1831 instantiate_fn: DebugInstantiate,
1832 deserialize_fn: Option<DebugExpr>,
1833 input: Box<HydroNode>,
1834 metadata: HydroIrMetadata,
1835 },
1836
1837 ExternalInput {
1838 from_external_key: LocationKey,
1839 from_port_id: ExternalPortId,
1840 from_many: bool,
1841 codec_type: DebugType,
1842 port_hint: NetworkHint,
1843 instantiate_fn: DebugInstantiate,
1844 deserialize_fn: Option<DebugExpr>,
1845 metadata: HydroIrMetadata,
1846 },
1847
1848 Counter {
1849 tag: String,
1850 duration: DebugExpr,
1851 prefix: String,
1852 input: Box<HydroNode>,
1853 metadata: HydroIrMetadata,
1854 },
1855}
1856
1857pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1858pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1859
1860impl HydroNode {
1861 pub fn transform_bottom_up(
1862 &mut self,
1863 transform: &mut impl FnMut(&mut HydroNode),
1864 seen_tees: &mut SeenTees,
1865 check_well_formed: bool,
1866 ) {
1867 self.transform_children(
1868 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1869 seen_tees,
1870 );
1871
1872 transform(self);
1873
1874 let self_location = self.metadata().location_id.root();
1875
1876 if check_well_formed {
1877 match &*self {
1878 HydroNode::Network { .. } => {}
1879 _ => {
1880 self.input_metadata().iter().for_each(|i| {
1881 if i.location_id.root() != self_location {
1882 panic!(
1883 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1884 i,
1885 i.location_id.root(),
1886 self,
1887 self_location
1888 )
1889 }
1890 });
1891 }
1892 }
1893 }
1894 }
1895
1896 #[inline(always)]
1897 pub fn transform_children(
1898 &mut self,
1899 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1900 seen_tees: &mut SeenTees,
1901 ) {
1902 match self {
1903 HydroNode::Placeholder => {
1904 panic!();
1905 }
1906
1907 HydroNode::Source { .. }
1908 | HydroNode::SingletonSource { .. }
1909 | HydroNode::CycleSource { .. }
1910 | HydroNode::ExternalInput { .. } => {}
1911
1912 HydroNode::Tee { inner, .. } => {
1913 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1914 *inner = TeeNode(transformed.clone());
1915 } else {
1916 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1917 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1918 let mut orig = inner.0.replace(HydroNode::Placeholder);
1919 transform(&mut orig, seen_tees);
1920 *transformed_cell.borrow_mut() = orig;
1921 *inner = TeeNode(transformed_cell);
1922 }
1923 }
1924
1925 HydroNode::Cast { inner, .. }
1926 | HydroNode::ObserveNonDet { inner, .. }
1927 | HydroNode::BeginAtomic { inner, .. }
1928 | HydroNode::EndAtomic { inner, .. }
1929 | HydroNode::Batch { inner, .. }
1930 | HydroNode::YieldConcat { inner, .. } => {
1931 transform(inner.as_mut(), seen_tees);
1932 }
1933
1934 HydroNode::Chain { first, second, .. } => {
1935 transform(first.as_mut(), seen_tees);
1936 transform(second.as_mut(), seen_tees);
1937 }
1938
1939 HydroNode::ChainFirst { first, second, .. } => {
1940 transform(first.as_mut(), seen_tees);
1941 transform(second.as_mut(), seen_tees);
1942 }
1943
1944 HydroNode::CrossSingleton { left, right, .. }
1945 | HydroNode::CrossProduct { left, right, .. }
1946 | HydroNode::Join { left, right, .. } => {
1947 transform(left.as_mut(), seen_tees);
1948 transform(right.as_mut(), seen_tees);
1949 }
1950
1951 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1952 transform(pos.as_mut(), seen_tees);
1953 transform(neg.as_mut(), seen_tees);
1954 }
1955
1956 HydroNode::ReduceKeyedWatermark {
1957 input, watermark, ..
1958 } => {
1959 transform(input.as_mut(), seen_tees);
1960 transform(watermark.as_mut(), seen_tees);
1961 }
1962
1963 HydroNode::Map { input, .. }
1964 | HydroNode::ResolveFutures { input, .. }
1965 | HydroNode::ResolveFuturesOrdered { input, .. }
1966 | HydroNode::FlatMap { input, .. }
1967 | HydroNode::Filter { input, .. }
1968 | HydroNode::FilterMap { input, .. }
1969 | HydroNode::Sort { input, .. }
1970 | HydroNode::DeferTick { input, .. }
1971 | HydroNode::Enumerate { input, .. }
1972 | HydroNode::Inspect { input, .. }
1973 | HydroNode::Unique { input, .. }
1974 | HydroNode::Network { input, .. }
1975 | HydroNode::Fold { input, .. }
1976 | HydroNode::Scan { input, .. }
1977 | HydroNode::FoldKeyed { input, .. }
1978 | HydroNode::Reduce { input, .. }
1979 | HydroNode::ReduceKeyed { input, .. }
1980 | HydroNode::Counter { input, .. } => {
1981 transform(input.as_mut(), seen_tees);
1982 }
1983 }
1984 }
1985
1986 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1987 match self {
1988 HydroNode::Placeholder => HydroNode::Placeholder,
1989 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1990 inner: Box::new(inner.deep_clone(seen_tees)),
1991 metadata: metadata.clone(),
1992 },
1993 HydroNode::ObserveNonDet {
1994 inner,
1995 trusted,
1996 metadata,
1997 } => HydroNode::ObserveNonDet {
1998 inner: Box::new(inner.deep_clone(seen_tees)),
1999 trusted: *trusted,
2000 metadata: metadata.clone(),
2001 },
2002 HydroNode::Source { source, metadata } => HydroNode::Source {
2003 source: source.clone(),
2004 metadata: metadata.clone(),
2005 },
2006 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
2007 value: value.clone(),
2008 metadata: metadata.clone(),
2009 },
2010 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2011 cycle_id: *cycle_id,
2012 metadata: metadata.clone(),
2013 },
2014 HydroNode::Tee { inner, metadata } => {
2015 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2016 HydroNode::Tee {
2017 inner: TeeNode(transformed.clone()),
2018 metadata: metadata.clone(),
2019 }
2020 } else {
2021 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2022 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2023 let cloned = inner.0.borrow().deep_clone(seen_tees);
2024 *new_rc.borrow_mut() = cloned;
2025 HydroNode::Tee {
2026 inner: TeeNode(new_rc),
2027 metadata: metadata.clone(),
2028 }
2029 }
2030 }
2031 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2032 inner: Box::new(inner.deep_clone(seen_tees)),
2033 metadata: metadata.clone(),
2034 },
2035 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2036 inner: Box::new(inner.deep_clone(seen_tees)),
2037 metadata: metadata.clone(),
2038 },
2039 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2040 inner: Box::new(inner.deep_clone(seen_tees)),
2041 metadata: metadata.clone(),
2042 },
2043 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2044 inner: Box::new(inner.deep_clone(seen_tees)),
2045 metadata: metadata.clone(),
2046 },
2047 HydroNode::Chain {
2048 first,
2049 second,
2050 metadata,
2051 } => HydroNode::Chain {
2052 first: Box::new(first.deep_clone(seen_tees)),
2053 second: Box::new(second.deep_clone(seen_tees)),
2054 metadata: metadata.clone(),
2055 },
2056 HydroNode::ChainFirst {
2057 first,
2058 second,
2059 metadata,
2060 } => HydroNode::ChainFirst {
2061 first: Box::new(first.deep_clone(seen_tees)),
2062 second: Box::new(second.deep_clone(seen_tees)),
2063 metadata: metadata.clone(),
2064 },
2065 HydroNode::CrossProduct {
2066 left,
2067 right,
2068 metadata,
2069 } => HydroNode::CrossProduct {
2070 left: Box::new(left.deep_clone(seen_tees)),
2071 right: Box::new(right.deep_clone(seen_tees)),
2072 metadata: metadata.clone(),
2073 },
2074 HydroNode::CrossSingleton {
2075 left,
2076 right,
2077 metadata,
2078 } => HydroNode::CrossSingleton {
2079 left: Box::new(left.deep_clone(seen_tees)),
2080 right: Box::new(right.deep_clone(seen_tees)),
2081 metadata: metadata.clone(),
2082 },
2083 HydroNode::Join {
2084 left,
2085 right,
2086 metadata,
2087 } => HydroNode::Join {
2088 left: Box::new(left.deep_clone(seen_tees)),
2089 right: Box::new(right.deep_clone(seen_tees)),
2090 metadata: metadata.clone(),
2091 },
2092 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2093 pos: Box::new(pos.deep_clone(seen_tees)),
2094 neg: Box::new(neg.deep_clone(seen_tees)),
2095 metadata: metadata.clone(),
2096 },
2097 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2098 pos: Box::new(pos.deep_clone(seen_tees)),
2099 neg: Box::new(neg.deep_clone(seen_tees)),
2100 metadata: metadata.clone(),
2101 },
2102 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2103 input: Box::new(input.deep_clone(seen_tees)),
2104 metadata: metadata.clone(),
2105 },
2106 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2107 HydroNode::ResolveFuturesOrdered {
2108 input: Box::new(input.deep_clone(seen_tees)),
2109 metadata: metadata.clone(),
2110 }
2111 }
2112 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2113 f: f.clone(),
2114 input: Box::new(input.deep_clone(seen_tees)),
2115 metadata: metadata.clone(),
2116 },
2117 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2118 f: f.clone(),
2119 input: Box::new(input.deep_clone(seen_tees)),
2120 metadata: metadata.clone(),
2121 },
2122 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2123 f: f.clone(),
2124 input: Box::new(input.deep_clone(seen_tees)),
2125 metadata: metadata.clone(),
2126 },
2127 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2128 f: f.clone(),
2129 input: Box::new(input.deep_clone(seen_tees)),
2130 metadata: metadata.clone(),
2131 },
2132 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2133 input: Box::new(input.deep_clone(seen_tees)),
2134 metadata: metadata.clone(),
2135 },
2136 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2137 input: Box::new(input.deep_clone(seen_tees)),
2138 metadata: metadata.clone(),
2139 },
2140 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2141 f: f.clone(),
2142 input: Box::new(input.deep_clone(seen_tees)),
2143 metadata: metadata.clone(),
2144 },
2145 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2146 input: Box::new(input.deep_clone(seen_tees)),
2147 metadata: metadata.clone(),
2148 },
2149 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2150 input: Box::new(input.deep_clone(seen_tees)),
2151 metadata: metadata.clone(),
2152 },
2153 HydroNode::Fold {
2154 init,
2155 acc,
2156 input,
2157 metadata,
2158 } => HydroNode::Fold {
2159 init: init.clone(),
2160 acc: acc.clone(),
2161 input: Box::new(input.deep_clone(seen_tees)),
2162 metadata: metadata.clone(),
2163 },
2164 HydroNode::Scan {
2165 init,
2166 acc,
2167 input,
2168 metadata,
2169 } => HydroNode::Scan {
2170 init: init.clone(),
2171 acc: acc.clone(),
2172 input: Box::new(input.deep_clone(seen_tees)),
2173 metadata: metadata.clone(),
2174 },
2175 HydroNode::FoldKeyed {
2176 init,
2177 acc,
2178 input,
2179 metadata,
2180 } => HydroNode::FoldKeyed {
2181 init: init.clone(),
2182 acc: acc.clone(),
2183 input: Box::new(input.deep_clone(seen_tees)),
2184 metadata: metadata.clone(),
2185 },
2186 HydroNode::ReduceKeyedWatermark {
2187 f,
2188 input,
2189 watermark,
2190 metadata,
2191 } => HydroNode::ReduceKeyedWatermark {
2192 f: f.clone(),
2193 input: Box::new(input.deep_clone(seen_tees)),
2194 watermark: Box::new(watermark.deep_clone(seen_tees)),
2195 metadata: metadata.clone(),
2196 },
2197 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2198 f: f.clone(),
2199 input: Box::new(input.deep_clone(seen_tees)),
2200 metadata: metadata.clone(),
2201 },
2202 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2203 f: f.clone(),
2204 input: Box::new(input.deep_clone(seen_tees)),
2205 metadata: metadata.clone(),
2206 },
2207 HydroNode::Network {
2208 name,
2209 serialize_fn,
2210 instantiate_fn,
2211 deserialize_fn,
2212 input,
2213 metadata,
2214 } => HydroNode::Network {
2215 name: name.clone(),
2216 serialize_fn: serialize_fn.clone(),
2217 instantiate_fn: instantiate_fn.clone(),
2218 deserialize_fn: deserialize_fn.clone(),
2219 input: Box::new(input.deep_clone(seen_tees)),
2220 metadata: metadata.clone(),
2221 },
2222 HydroNode::ExternalInput {
2223 from_external_key,
2224 from_port_id,
2225 from_many,
2226 codec_type,
2227 port_hint,
2228 instantiate_fn,
2229 deserialize_fn,
2230 metadata,
2231 } => HydroNode::ExternalInput {
2232 from_external_key: *from_external_key,
2233 from_port_id: *from_port_id,
2234 from_many: *from_many,
2235 codec_type: codec_type.clone(),
2236 port_hint: *port_hint,
2237 instantiate_fn: instantiate_fn.clone(),
2238 deserialize_fn: deserialize_fn.clone(),
2239 metadata: metadata.clone(),
2240 },
2241 HydroNode::Counter {
2242 tag,
2243 duration,
2244 prefix,
2245 input,
2246 metadata,
2247 } => HydroNode::Counter {
2248 tag: tag.clone(),
2249 duration: duration.clone(),
2250 prefix: prefix.clone(),
2251 input: Box::new(input.deep_clone(seen_tees)),
2252 metadata: metadata.clone(),
2253 },
2254 }
2255 }
2256
2257 #[cfg(feature = "build")]
2258 pub fn emit_core(
2259 &mut self,
2260 builders_or_callback: &mut BuildersOrCallback<
2261 impl FnMut(&mut HydroRoot, &mut usize),
2262 impl FnMut(&mut HydroNode, &mut usize),
2263 >,
2264 seen_tees: &mut SeenTees,
2265 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2266 next_stmt_id: &mut usize,
2267 ) -> syn::Ident {
2268 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2269
2270 self.transform_bottom_up(
2271 &mut |node: &mut HydroNode| {
2272 let out_location = node.metadata().location_id.clone();
2273 match node {
2274 HydroNode::Placeholder => {
2275 panic!()
2276 }
2277
2278 HydroNode::Cast { .. } => {
2279 match builders_or_callback {
2282 BuildersOrCallback::Builders(_) => {}
2283 BuildersOrCallback::Callback(_, node_callback) => {
2284 node_callback(node, next_stmt_id);
2285 }
2286 }
2287
2288 *next_stmt_id += 1;
2289 }
2291
2292 HydroNode::ObserveNonDet {
2293 inner,
2294 trusted,
2295 metadata,
2296 ..
2297 } => {
2298 let inner_ident = ident_stack.pop().unwrap();
2299
2300 let observe_ident =
2301 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2302
2303 match builders_or_callback {
2304 BuildersOrCallback::Builders(graph_builders) => {
2305 graph_builders.observe_nondet(
2306 *trusted,
2307 &inner.metadata().location_id,
2308 inner_ident,
2309 &inner.metadata().collection_kind,
2310 &observe_ident,
2311 &metadata.collection_kind,
2312 &metadata.op,
2313 );
2314 }
2315 BuildersOrCallback::Callback(_, node_callback) => {
2316 node_callback(node, next_stmt_id);
2317 }
2318 }
2319
2320 *next_stmt_id += 1;
2321
2322 ident_stack.push(observe_ident);
2323 }
2324
2325 HydroNode::Batch {
2326 inner, metadata, ..
2327 } => {
2328 let inner_ident = ident_stack.pop().unwrap();
2329
2330 let batch_ident =
2331 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2332
2333 match builders_or_callback {
2334 BuildersOrCallback::Builders(graph_builders) => {
2335 graph_builders.batch(
2336 inner_ident,
2337 &inner.metadata().location_id,
2338 &inner.metadata().collection_kind,
2339 &batch_ident,
2340 &out_location,
2341 &metadata.op,
2342 );
2343 }
2344 BuildersOrCallback::Callback(_, node_callback) => {
2345 node_callback(node, next_stmt_id);
2346 }
2347 }
2348
2349 *next_stmt_id += 1;
2350
2351 ident_stack.push(batch_ident);
2352 }
2353
2354 HydroNode::YieldConcat { inner, .. } => {
2355 let inner_ident = ident_stack.pop().unwrap();
2356
2357 let yield_ident =
2358 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2359
2360 match builders_or_callback {
2361 BuildersOrCallback::Builders(graph_builders) => {
2362 graph_builders.yield_from_tick(
2363 inner_ident,
2364 &inner.metadata().location_id,
2365 &inner.metadata().collection_kind,
2366 &yield_ident,
2367 &out_location,
2368 );
2369 }
2370 BuildersOrCallback::Callback(_, node_callback) => {
2371 node_callback(node, next_stmt_id);
2372 }
2373 }
2374
2375 *next_stmt_id += 1;
2376
2377 ident_stack.push(yield_ident);
2378 }
2379
2380 HydroNode::BeginAtomic { inner, metadata } => {
2381 let inner_ident = ident_stack.pop().unwrap();
2382
2383 let begin_ident =
2384 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2385
2386 match builders_or_callback {
2387 BuildersOrCallback::Builders(graph_builders) => {
2388 graph_builders.begin_atomic(
2389 inner_ident,
2390 &inner.metadata().location_id,
2391 &inner.metadata().collection_kind,
2392 &begin_ident,
2393 &out_location,
2394 &metadata.op,
2395 );
2396 }
2397 BuildersOrCallback::Callback(_, node_callback) => {
2398 node_callback(node, next_stmt_id);
2399 }
2400 }
2401
2402 *next_stmt_id += 1;
2403
2404 ident_stack.push(begin_ident);
2405 }
2406
2407 HydroNode::EndAtomic { inner, .. } => {
2408 let inner_ident = ident_stack.pop().unwrap();
2409
2410 let end_ident =
2411 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2412
2413 match builders_or_callback {
2414 BuildersOrCallback::Builders(graph_builders) => {
2415 graph_builders.end_atomic(
2416 inner_ident,
2417 &inner.metadata().location_id,
2418 &inner.metadata().collection_kind,
2419 &end_ident,
2420 );
2421 }
2422 BuildersOrCallback::Callback(_, node_callback) => {
2423 node_callback(node, next_stmt_id);
2424 }
2425 }
2426
2427 *next_stmt_id += 1;
2428
2429 ident_stack.push(end_ident);
2430 }
2431
2432 HydroNode::Source {
2433 source, metadata, ..
2434 } => {
2435 if let HydroSource::ExternalNetwork() = source {
2436 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2437 } else {
2438 let source_ident =
2439 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2440
2441 let source_stmt = match source {
2442 HydroSource::Stream(expr) => {
2443 debug_assert!(metadata.location_id.is_top_level());
2444 parse_quote! {
2445 #source_ident = source_stream(#expr);
2446 }
2447 }
2448
2449 HydroSource::ExternalNetwork() => {
2450 unreachable!()
2451 }
2452
2453 HydroSource::Iter(expr) => {
2454 if metadata.location_id.is_top_level() {
2455 parse_quote! {
2456 #source_ident = source_iter(#expr);
2457 }
2458 } else {
2459 parse_quote! {
2461 #source_ident = source_iter(#expr) -> persist::<'static>();
2462 }
2463 }
2464 }
2465
2466 HydroSource::Spin() => {
2467 debug_assert!(metadata.location_id.is_top_level());
2468 parse_quote! {
2469 #source_ident = spin();
2470 }
2471 }
2472
2473 HydroSource::ClusterMembers(target_loc, state) => {
2474 debug_assert!(metadata.location_id.is_top_level());
2475
2476 let members_tee_ident = syn::Ident::new(
2477 &format!(
2478 "__cluster_members_tee_{}_{}",
2479 metadata.location_id.root().key(),
2480 target_loc.key(),
2481 ),
2482 Span::call_site(),
2483 );
2484
2485 match state {
2486 ClusterMembersState::Stream(d) => {
2487 parse_quote! {
2488 #members_tee_ident = source_stream(#d) -> tee();
2489 #source_ident = #members_tee_ident;
2490 }
2491 },
2492 ClusterMembersState::Uninit => syn::parse_quote! {
2493 #source_ident = source_stream(DUMMY);
2494 },
2495 ClusterMembersState::Tee(..) => parse_quote! {
2496 #source_ident = #members_tee_ident;
2497 },
2498 }
2499 }
2500
2501 HydroSource::Embedded(ident) => {
2502 parse_quote! {
2503 #source_ident = source_stream(#ident);
2504 }
2505 }
2506 };
2507
2508 match builders_or_callback {
2509 BuildersOrCallback::Builders(graph_builders) => {
2510 let builder = graph_builders.get_dfir_mut(&out_location);
2511 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2512 }
2513 BuildersOrCallback::Callback(_, node_callback) => {
2514 node_callback(node, next_stmt_id);
2515 }
2516 }
2517
2518 *next_stmt_id += 1;
2519
2520 ident_stack.push(source_ident);
2521 }
2522 }
2523
2524 HydroNode::SingletonSource { value, metadata } => {
2525 let source_ident =
2526 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2527
2528 match builders_or_callback {
2529 BuildersOrCallback::Builders(graph_builders) => {
2530 let builder = graph_builders.get_dfir_mut(&out_location);
2531
2532 if metadata.location_id.is_top_level()
2533 && metadata.collection_kind.is_bounded()
2534 {
2535 builder.add_dfir(
2536 parse_quote! {
2537 #source_ident = source_iter([#value]);
2538 },
2539 None,
2540 Some(&next_stmt_id.to_string()),
2541 );
2542 } else {
2543 builder.add_dfir(
2544 parse_quote! {
2545 #source_ident = source_iter([#value]) -> persist::<'static>();
2546 },
2547 None,
2548 Some(&next_stmt_id.to_string()),
2549 );
2550 }
2551 }
2552 BuildersOrCallback::Callback(_, node_callback) => {
2553 node_callback(node, next_stmt_id);
2554 }
2555 }
2556
2557 *next_stmt_id += 1;
2558
2559 ident_stack.push(source_ident);
2560 }
2561
2562 HydroNode::CycleSource { cycle_id, .. } => {
2563 let ident = cycle_id.as_ident();
2564
2565 match builders_or_callback {
2566 BuildersOrCallback::Builders(_) => {}
2567 BuildersOrCallback::Callback(_, node_callback) => {
2568 node_callback(node, next_stmt_id);
2569 }
2570 }
2571
2572 *next_stmt_id += 1;
2574
2575 ident_stack.push(ident);
2576 }
2577
2578 HydroNode::Tee { inner, .. } => {
2579 let ret_ident = if let Some(teed_from) =
2580 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2581 {
2582 match builders_or_callback {
2583 BuildersOrCallback::Builders(_) => {}
2584 BuildersOrCallback::Callback(_, node_callback) => {
2585 node_callback(node, next_stmt_id);
2586 }
2587 }
2588
2589 teed_from.clone()
2590 } else {
2591 let inner_ident = ident_stack.pop().unwrap();
2594
2595 let tee_ident =
2596 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2597
2598 built_tees.insert(
2599 inner.0.as_ref() as *const RefCell<HydroNode>,
2600 tee_ident.clone(),
2601 );
2602
2603 match builders_or_callback {
2604 BuildersOrCallback::Builders(graph_builders) => {
2605 let builder = graph_builders.get_dfir_mut(&out_location);
2606 builder.add_dfir(
2607 parse_quote! {
2608 #tee_ident = #inner_ident -> tee();
2609 },
2610 None,
2611 Some(&next_stmt_id.to_string()),
2612 );
2613 }
2614 BuildersOrCallback::Callback(_, node_callback) => {
2615 node_callback(node, next_stmt_id);
2616 }
2617 }
2618
2619 tee_ident
2620 };
2621
2622 *next_stmt_id += 1;
2626 ident_stack.push(ret_ident);
2627 }
2628
2629 HydroNode::Chain { .. } => {
2630 let second_ident = ident_stack.pop().unwrap();
2632 let first_ident = ident_stack.pop().unwrap();
2633
2634 let chain_ident =
2635 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2636
2637 match builders_or_callback {
2638 BuildersOrCallback::Builders(graph_builders) => {
2639 let builder = graph_builders.get_dfir_mut(&out_location);
2640 builder.add_dfir(
2641 parse_quote! {
2642 #chain_ident = chain();
2643 #first_ident -> [0]#chain_ident;
2644 #second_ident -> [1]#chain_ident;
2645 },
2646 None,
2647 Some(&next_stmt_id.to_string()),
2648 );
2649 }
2650 BuildersOrCallback::Callback(_, node_callback) => {
2651 node_callback(node, next_stmt_id);
2652 }
2653 }
2654
2655 *next_stmt_id += 1;
2656
2657 ident_stack.push(chain_ident);
2658 }
2659
2660 HydroNode::ChainFirst { .. } => {
2661 let second_ident = ident_stack.pop().unwrap();
2662 let first_ident = ident_stack.pop().unwrap();
2663
2664 let chain_ident =
2665 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2666
2667 match builders_or_callback {
2668 BuildersOrCallback::Builders(graph_builders) => {
2669 let builder = graph_builders.get_dfir_mut(&out_location);
2670 builder.add_dfir(
2671 parse_quote! {
2672 #chain_ident = chain_first_n(1);
2673 #first_ident -> [0]#chain_ident;
2674 #second_ident -> [1]#chain_ident;
2675 },
2676 None,
2677 Some(&next_stmt_id.to_string()),
2678 );
2679 }
2680 BuildersOrCallback::Callback(_, node_callback) => {
2681 node_callback(node, next_stmt_id);
2682 }
2683 }
2684
2685 *next_stmt_id += 1;
2686
2687 ident_stack.push(chain_ident);
2688 }
2689
2690 HydroNode::CrossSingleton { right, .. } => {
2691 let right_ident = ident_stack.pop().unwrap();
2692 let left_ident = ident_stack.pop().unwrap();
2693
2694 let cross_ident =
2695 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2696
2697 match builders_or_callback {
2698 BuildersOrCallback::Builders(graph_builders) => {
2699 let builder = graph_builders.get_dfir_mut(&out_location);
2700
2701 if right.metadata().location_id.is_top_level()
2702 && right.metadata().collection_kind.is_bounded()
2703 {
2704 builder.add_dfir(
2705 parse_quote! {
2706 #cross_ident = cross_singleton();
2707 #left_ident -> [input]#cross_ident;
2708 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2709 },
2710 None,
2711 Some(&next_stmt_id.to_string()),
2712 );
2713 } else {
2714 builder.add_dfir(
2715 parse_quote! {
2716 #cross_ident = cross_singleton();
2717 #left_ident -> [input]#cross_ident;
2718 #right_ident -> [single]#cross_ident;
2719 },
2720 None,
2721 Some(&next_stmt_id.to_string()),
2722 );
2723 }
2724 }
2725 BuildersOrCallback::Callback(_, node_callback) => {
2726 node_callback(node, next_stmt_id);
2727 }
2728 }
2729
2730 *next_stmt_id += 1;
2731
2732 ident_stack.push(cross_ident);
2733 }
2734
2735 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2736 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2737 parse_quote!(cross_join_multiset)
2738 } else {
2739 parse_quote!(join_multiset)
2740 };
2741
2742 let (HydroNode::CrossProduct { left, right, .. }
2743 | HydroNode::Join { left, right, .. }) = node
2744 else {
2745 unreachable!()
2746 };
2747
2748 let is_top_level = left.metadata().location_id.is_top_level()
2749 && right.metadata().location_id.is_top_level();
2750 let left_lifetime = if left.metadata().location_id.is_top_level() {
2751 quote!('static)
2752 } else {
2753 quote!('tick)
2754 };
2755
2756 let right_lifetime = if right.metadata().location_id.is_top_level() {
2757 quote!('static)
2758 } else {
2759 quote!('tick)
2760 };
2761
2762 let right_ident = ident_stack.pop().unwrap();
2763 let left_ident = ident_stack.pop().unwrap();
2764
2765 let stream_ident =
2766 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2767
2768 match builders_or_callback {
2769 BuildersOrCallback::Builders(graph_builders) => {
2770 let builder = graph_builders.get_dfir_mut(&out_location);
2771 builder.add_dfir(
2772 if is_top_level {
2773 parse_quote! {
2776 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2777 #left_ident -> [0]#stream_ident;
2778 #right_ident -> [1]#stream_ident;
2779 }
2780 } else {
2781 parse_quote! {
2782 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2783 #left_ident -> [0]#stream_ident;
2784 #right_ident -> [1]#stream_ident;
2785 }
2786 }
2787 ,
2788 None,
2789 Some(&next_stmt_id.to_string()),
2790 );
2791 }
2792 BuildersOrCallback::Callback(_, node_callback) => {
2793 node_callback(node, next_stmt_id);
2794 }
2795 }
2796
2797 *next_stmt_id += 1;
2798
2799 ident_stack.push(stream_ident);
2800 }
2801
2802 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2803 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2804 parse_quote!(difference)
2805 } else {
2806 parse_quote!(anti_join)
2807 };
2808
2809 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2810 node
2811 else {
2812 unreachable!()
2813 };
2814
2815 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2816 quote!('static)
2817 } else {
2818 quote!('tick)
2819 };
2820
2821 let neg_ident = ident_stack.pop().unwrap();
2822 let pos_ident = ident_stack.pop().unwrap();
2823
2824 let stream_ident =
2825 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2826
2827 match builders_or_callback {
2828 BuildersOrCallback::Builders(graph_builders) => {
2829 let builder = graph_builders.get_dfir_mut(&out_location);
2830 builder.add_dfir(
2831 parse_quote! {
2832 #stream_ident = #operator::<'tick, #neg_lifetime>();
2833 #pos_ident -> [pos]#stream_ident;
2834 #neg_ident -> [neg]#stream_ident;
2835 },
2836 None,
2837 Some(&next_stmt_id.to_string()),
2838 );
2839 }
2840 BuildersOrCallback::Callback(_, node_callback) => {
2841 node_callback(node, next_stmt_id);
2842 }
2843 }
2844
2845 *next_stmt_id += 1;
2846
2847 ident_stack.push(stream_ident);
2848 }
2849
2850 HydroNode::ResolveFutures { .. } => {
2851 let input_ident = ident_stack.pop().unwrap();
2852
2853 let futures_ident =
2854 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2855
2856 match builders_or_callback {
2857 BuildersOrCallback::Builders(graph_builders) => {
2858 let builder = graph_builders.get_dfir_mut(&out_location);
2859 builder.add_dfir(
2860 parse_quote! {
2861 #futures_ident = #input_ident -> resolve_futures();
2862 },
2863 None,
2864 Some(&next_stmt_id.to_string()),
2865 );
2866 }
2867 BuildersOrCallback::Callback(_, node_callback) => {
2868 node_callback(node, next_stmt_id);
2869 }
2870 }
2871
2872 *next_stmt_id += 1;
2873
2874 ident_stack.push(futures_ident);
2875 }
2876
2877 HydroNode::ResolveFuturesOrdered { .. } => {
2878 let input_ident = ident_stack.pop().unwrap();
2879
2880 let futures_ident =
2881 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2882
2883 match builders_or_callback {
2884 BuildersOrCallback::Builders(graph_builders) => {
2885 let builder = graph_builders.get_dfir_mut(&out_location);
2886 builder.add_dfir(
2887 parse_quote! {
2888 #futures_ident = #input_ident -> resolve_futures_ordered();
2889 },
2890 None,
2891 Some(&next_stmt_id.to_string()),
2892 );
2893 }
2894 BuildersOrCallback::Callback(_, node_callback) => {
2895 node_callback(node, next_stmt_id);
2896 }
2897 }
2898
2899 *next_stmt_id += 1;
2900
2901 ident_stack.push(futures_ident);
2902 }
2903
2904 HydroNode::Map { f, .. } => {
2905 let input_ident = ident_stack.pop().unwrap();
2906
2907 let map_ident =
2908 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2909
2910 match builders_or_callback {
2911 BuildersOrCallback::Builders(graph_builders) => {
2912 let builder = graph_builders.get_dfir_mut(&out_location);
2913 builder.add_dfir(
2914 parse_quote! {
2915 #map_ident = #input_ident -> map(#f);
2916 },
2917 None,
2918 Some(&next_stmt_id.to_string()),
2919 );
2920 }
2921 BuildersOrCallback::Callback(_, node_callback) => {
2922 node_callback(node, next_stmt_id);
2923 }
2924 }
2925
2926 *next_stmt_id += 1;
2927
2928 ident_stack.push(map_ident);
2929 }
2930
2931 HydroNode::FlatMap { f, .. } => {
2932 let input_ident = ident_stack.pop().unwrap();
2933
2934 let flat_map_ident =
2935 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2936
2937 match builders_or_callback {
2938 BuildersOrCallback::Builders(graph_builders) => {
2939 let builder = graph_builders.get_dfir_mut(&out_location);
2940 builder.add_dfir(
2941 parse_quote! {
2942 #flat_map_ident = #input_ident -> flat_map(#f);
2943 },
2944 None,
2945 Some(&next_stmt_id.to_string()),
2946 );
2947 }
2948 BuildersOrCallback::Callback(_, node_callback) => {
2949 node_callback(node, next_stmt_id);
2950 }
2951 }
2952
2953 *next_stmt_id += 1;
2954
2955 ident_stack.push(flat_map_ident);
2956 }
2957
2958 HydroNode::Filter { f, .. } => {
2959 let input_ident = ident_stack.pop().unwrap();
2960
2961 let filter_ident =
2962 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2963
2964 match builders_or_callback {
2965 BuildersOrCallback::Builders(graph_builders) => {
2966 let builder = graph_builders.get_dfir_mut(&out_location);
2967 builder.add_dfir(
2968 parse_quote! {
2969 #filter_ident = #input_ident -> filter(#f);
2970 },
2971 None,
2972 Some(&next_stmt_id.to_string()),
2973 );
2974 }
2975 BuildersOrCallback::Callback(_, node_callback) => {
2976 node_callback(node, next_stmt_id);
2977 }
2978 }
2979
2980 *next_stmt_id += 1;
2981
2982 ident_stack.push(filter_ident);
2983 }
2984
2985 HydroNode::FilterMap { f, .. } => {
2986 let input_ident = ident_stack.pop().unwrap();
2987
2988 let filter_map_ident =
2989 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2990
2991 match builders_or_callback {
2992 BuildersOrCallback::Builders(graph_builders) => {
2993 let builder = graph_builders.get_dfir_mut(&out_location);
2994 builder.add_dfir(
2995 parse_quote! {
2996 #filter_map_ident = #input_ident -> filter_map(#f);
2997 },
2998 None,
2999 Some(&next_stmt_id.to_string()),
3000 );
3001 }
3002 BuildersOrCallback::Callback(_, node_callback) => {
3003 node_callback(node, next_stmt_id);
3004 }
3005 }
3006
3007 *next_stmt_id += 1;
3008
3009 ident_stack.push(filter_map_ident);
3010 }
3011
3012 HydroNode::Sort { .. } => {
3013 let input_ident = ident_stack.pop().unwrap();
3014
3015 let sort_ident =
3016 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3017
3018 match builders_or_callback {
3019 BuildersOrCallback::Builders(graph_builders) => {
3020 let builder = graph_builders.get_dfir_mut(&out_location);
3021 builder.add_dfir(
3022 parse_quote! {
3023 #sort_ident = #input_ident -> sort();
3024 },
3025 None,
3026 Some(&next_stmt_id.to_string()),
3027 );
3028 }
3029 BuildersOrCallback::Callback(_, node_callback) => {
3030 node_callback(node, next_stmt_id);
3031 }
3032 }
3033
3034 *next_stmt_id += 1;
3035
3036 ident_stack.push(sort_ident);
3037 }
3038
3039 HydroNode::DeferTick { .. } => {
3040 let input_ident = ident_stack.pop().unwrap();
3041
3042 let defer_tick_ident =
3043 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3044
3045 match builders_or_callback {
3046 BuildersOrCallback::Builders(graph_builders) => {
3047 let builder = graph_builders.get_dfir_mut(&out_location);
3048 builder.add_dfir(
3049 parse_quote! {
3050 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3051 },
3052 None,
3053 Some(&next_stmt_id.to_string()),
3054 );
3055 }
3056 BuildersOrCallback::Callback(_, node_callback) => {
3057 node_callback(node, next_stmt_id);
3058 }
3059 }
3060
3061 *next_stmt_id += 1;
3062
3063 ident_stack.push(defer_tick_ident);
3064 }
3065
3066 HydroNode::Enumerate { input, .. } => {
3067 let input_ident = ident_stack.pop().unwrap();
3068
3069 let enumerate_ident =
3070 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3071
3072 match builders_or_callback {
3073 BuildersOrCallback::Builders(graph_builders) => {
3074 let builder = graph_builders.get_dfir_mut(&out_location);
3075 let lifetime = if input.metadata().location_id.is_top_level() {
3076 quote!('static)
3077 } else {
3078 quote!('tick)
3079 };
3080 builder.add_dfir(
3081 parse_quote! {
3082 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3083 },
3084 None,
3085 Some(&next_stmt_id.to_string()),
3086 );
3087 }
3088 BuildersOrCallback::Callback(_, node_callback) => {
3089 node_callback(node, next_stmt_id);
3090 }
3091 }
3092
3093 *next_stmt_id += 1;
3094
3095 ident_stack.push(enumerate_ident);
3096 }
3097
3098 HydroNode::Inspect { f, .. } => {
3099 let input_ident = ident_stack.pop().unwrap();
3100
3101 let inspect_ident =
3102 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3103
3104 match builders_or_callback {
3105 BuildersOrCallback::Builders(graph_builders) => {
3106 let builder = graph_builders.get_dfir_mut(&out_location);
3107 builder.add_dfir(
3108 parse_quote! {
3109 #inspect_ident = #input_ident -> inspect(#f);
3110 },
3111 None,
3112 Some(&next_stmt_id.to_string()),
3113 );
3114 }
3115 BuildersOrCallback::Callback(_, node_callback) => {
3116 node_callback(node, next_stmt_id);
3117 }
3118 }
3119
3120 *next_stmt_id += 1;
3121
3122 ident_stack.push(inspect_ident);
3123 }
3124
3125 HydroNode::Unique { input, .. } => {
3126 let input_ident = ident_stack.pop().unwrap();
3127
3128 let unique_ident =
3129 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3130
3131 match builders_or_callback {
3132 BuildersOrCallback::Builders(graph_builders) => {
3133 let builder = graph_builders.get_dfir_mut(&out_location);
3134 let lifetime = if input.metadata().location_id.is_top_level() {
3135 quote!('static)
3136 } else {
3137 quote!('tick)
3138 };
3139
3140 builder.add_dfir(
3141 parse_quote! {
3142 #unique_ident = #input_ident -> unique::<#lifetime>();
3143 },
3144 None,
3145 Some(&next_stmt_id.to_string()),
3146 );
3147 }
3148 BuildersOrCallback::Callback(_, node_callback) => {
3149 node_callback(node, next_stmt_id);
3150 }
3151 }
3152
3153 *next_stmt_id += 1;
3154
3155 ident_stack.push(unique_ident);
3156 }
3157
3158 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3159 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3160 if input.metadata().location_id.is_top_level()
3161 && input.metadata().collection_kind.is_bounded()
3162 {
3163 parse_quote!(fold_no_replay)
3164 } else {
3165 parse_quote!(fold)
3166 }
3167 } else if matches!(node, HydroNode::Scan { .. }) {
3168 parse_quote!(scan)
3169 } else if let HydroNode::FoldKeyed { input, .. } = node {
3170 if input.metadata().location_id.is_top_level()
3171 && input.metadata().collection_kind.is_bounded()
3172 {
3173 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3174 } else {
3175 parse_quote!(fold_keyed)
3176 }
3177 } else {
3178 unreachable!()
3179 };
3180
3181 let (HydroNode::Fold { input, .. }
3182 | HydroNode::FoldKeyed { input, .. }
3183 | HydroNode::Scan { input, .. }) = node
3184 else {
3185 unreachable!()
3186 };
3187
3188 let lifetime = if input.metadata().location_id.is_top_level() {
3189 quote!('static)
3190 } else {
3191 quote!('tick)
3192 };
3193
3194 let input_ident = ident_stack.pop().unwrap();
3195
3196 let (HydroNode::Fold { init, acc, .. }
3197 | HydroNode::FoldKeyed { init, acc, .. }
3198 | HydroNode::Scan { init, acc, .. }) = &*node
3199 else {
3200 unreachable!()
3201 };
3202
3203 let fold_ident =
3204 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3205
3206 match builders_or_callback {
3207 BuildersOrCallback::Builders(graph_builders) => {
3208 if matches!(node, HydroNode::Fold { .. })
3209 && node.metadata().location_id.is_top_level()
3210 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3211 && graph_builders.singleton_intermediates()
3212 && !node.metadata().collection_kind.is_bounded()
3213 {
3214 let builder = graph_builders.get_dfir_mut(&out_location);
3215
3216 let acc: syn::Expr = parse_quote!({
3217 let mut __inner = #acc;
3218 move |__state, __value| {
3219 __inner(__state, __value);
3220 Some(__state.clone())
3221 }
3222 });
3223
3224 builder.add_dfir(
3225 parse_quote! {
3226 source_iter([(#init)()]) -> [0]#fold_ident;
3227 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3228 #fold_ident = chain();
3229 },
3230 None,
3231 Some(&next_stmt_id.to_string()),
3232 );
3233 } else if matches!(node, HydroNode::FoldKeyed { .. })
3234 && node.metadata().location_id.is_top_level()
3235 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3236 && graph_builders.singleton_intermediates()
3237 && !node.metadata().collection_kind.is_bounded()
3238 {
3239 let builder = graph_builders.get_dfir_mut(&out_location);
3240
3241 let acc: syn::Expr = parse_quote!({
3242 let mut __init = #init;
3243 let mut __inner = #acc;
3244 move |__state, __kv: (_, _)| {
3245 let __state = __state
3247 .entry(::std::clone::Clone::clone(&__kv.0))
3248 .or_insert_with(|| (__init)());
3249 __inner(__state, __kv.1);
3250 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3251 }
3252 });
3253
3254 builder.add_dfir(
3255 parse_quote! {
3256 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3257 },
3258 None,
3259 Some(&next_stmt_id.to_string()),
3260 );
3261 } else {
3262 let builder = graph_builders.get_dfir_mut(&out_location);
3263 builder.add_dfir(
3264 parse_quote! {
3265 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3266 },
3267 None,
3268 Some(&next_stmt_id.to_string()),
3269 );
3270 }
3271 }
3272 BuildersOrCallback::Callback(_, node_callback) => {
3273 node_callback(node, next_stmt_id);
3274 }
3275 }
3276
3277 *next_stmt_id += 1;
3278
3279 ident_stack.push(fold_ident);
3280 }
3281
3282 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3283 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3284 if input.metadata().location_id.is_top_level()
3285 && input.metadata().collection_kind.is_bounded()
3286 {
3287 parse_quote!(reduce_no_replay)
3288 } else {
3289 parse_quote!(reduce)
3290 }
3291 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3292 if input.metadata().location_id.is_top_level()
3293 && input.metadata().collection_kind.is_bounded()
3294 {
3295 todo!(
3296 "Calling keyed reduce on a top-level bounded collection is not supported"
3297 )
3298 } else {
3299 parse_quote!(reduce_keyed)
3300 }
3301 } else {
3302 unreachable!()
3303 };
3304
3305 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3306 else {
3307 unreachable!()
3308 };
3309
3310 let lifetime = if input.metadata().location_id.is_top_level() {
3311 quote!('static)
3312 } else {
3313 quote!('tick)
3314 };
3315
3316 let input_ident = ident_stack.pop().unwrap();
3317
3318 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3319 else {
3320 unreachable!()
3321 };
3322
3323 let reduce_ident =
3324 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3325
3326 match builders_or_callback {
3327 BuildersOrCallback::Builders(graph_builders) => {
3328 if matches!(node, HydroNode::Reduce { .. })
3329 && node.metadata().location_id.is_top_level()
3330 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3331 && graph_builders.singleton_intermediates()
3332 && !node.metadata().collection_kind.is_bounded()
3333 {
3334 todo!(
3335 "Reduce with optional intermediates is not yet supported in simulator"
3336 );
3337 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3338 && node.metadata().location_id.is_top_level()
3339 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3340 && graph_builders.singleton_intermediates()
3341 && !node.metadata().collection_kind.is_bounded()
3342 {
3343 todo!(
3344 "Reduce keyed with optional intermediates is not yet supported in simulator"
3345 );
3346 } else {
3347 let builder = graph_builders.get_dfir_mut(&out_location);
3348 builder.add_dfir(
3349 parse_quote! {
3350 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3351 },
3352 None,
3353 Some(&next_stmt_id.to_string()),
3354 );
3355 }
3356 }
3357 BuildersOrCallback::Callback(_, node_callback) => {
3358 node_callback(node, next_stmt_id);
3359 }
3360 }
3361
3362 *next_stmt_id += 1;
3363
3364 ident_stack.push(reduce_ident);
3365 }
3366
3367 HydroNode::ReduceKeyedWatermark {
3368 f,
3369 input,
3370 metadata,
3371 ..
3372 } => {
3373 let lifetime = if input.metadata().location_id.is_top_level() {
3374 quote!('static)
3375 } else {
3376 quote!('tick)
3377 };
3378
3379 let watermark_ident = ident_stack.pop().unwrap();
3381 let input_ident = ident_stack.pop().unwrap();
3382
3383 let chain_ident = syn::Ident::new(
3384 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3385 Span::call_site(),
3386 );
3387
3388 let fold_ident =
3389 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3390
3391 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3392 && input.metadata().collection_kind.is_bounded()
3393 {
3394 parse_quote!(fold_no_replay)
3395 } else {
3396 parse_quote!(fold)
3397 };
3398
3399 match builders_or_callback {
3400 BuildersOrCallback::Builders(graph_builders) => {
3401 if metadata.location_id.is_top_level()
3402 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3403 && graph_builders.singleton_intermediates()
3404 && !metadata.collection_kind.is_bounded()
3405 {
3406 todo!(
3407 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3408 )
3409 } else {
3410 let builder = graph_builders.get_dfir_mut(&out_location);
3411 builder.add_dfir(
3412 parse_quote! {
3413 #chain_ident = chain();
3414 #input_ident
3415 -> map(|x| (Some(x), None))
3416 -> [0]#chain_ident;
3417 #watermark_ident
3418 -> map(|watermark| (None, Some(watermark)))
3419 -> [1]#chain_ident;
3420
3421 #fold_ident = #chain_ident
3422 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3423 let __reduce_keyed_fn = #f;
3424 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3425 if let Some((k, v)) = opt_payload {
3426 if let Some(curr_watermark) = *opt_curr_watermark {
3427 if k <= curr_watermark {
3428 return;
3429 }
3430 }
3431 match map.entry(k) {
3432 ::std::collections::hash_map::Entry::Vacant(e) => {
3433 e.insert(v);
3434 }
3435 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3436 __reduce_keyed_fn(e.get_mut(), v);
3437 }
3438 }
3439 } else {
3440 let watermark = opt_watermark.unwrap();
3441 if let Some(curr_watermark) = *opt_curr_watermark {
3442 if watermark <= curr_watermark {
3443 return;
3444 }
3445 }
3446 *opt_curr_watermark = opt_watermark;
3447 map.retain(|k, _| *k > watermark);
3448 }
3449 }
3450 })
3451 -> flat_map(|(map, _curr_watermark)| map);
3452 },
3453 None,
3454 Some(&next_stmt_id.to_string()),
3455 );
3456 }
3457 }
3458 BuildersOrCallback::Callback(_, node_callback) => {
3459 node_callback(node, next_stmt_id);
3460 }
3461 }
3462
3463 *next_stmt_id += 1;
3464
3465 ident_stack.push(fold_ident);
3466 }
3467
3468 HydroNode::Network {
3469 serialize_fn: serialize_pipeline,
3470 instantiate_fn,
3471 deserialize_fn: deserialize_pipeline,
3472 input,
3473 ..
3474 } => {
3475 let input_ident = ident_stack.pop().unwrap();
3476
3477 let receiver_stream_ident =
3478 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3479
3480 match builders_or_callback {
3481 BuildersOrCallback::Builders(graph_builders) => {
3482 let (sink_expr, source_expr) = match instantiate_fn {
3483 DebugInstantiate::Building => (
3484 syn::parse_quote!(DUMMY_SINK),
3485 syn::parse_quote!(DUMMY_SOURCE),
3486 ),
3487
3488 DebugInstantiate::Finalized(finalized) => {
3489 (finalized.sink.clone(), finalized.source.clone())
3490 }
3491 };
3492
3493 graph_builders.create_network(
3494 &input.metadata().location_id,
3495 &out_location,
3496 input_ident,
3497 &receiver_stream_ident,
3498 serialize_pipeline.as_ref(),
3499 sink_expr,
3500 source_expr,
3501 deserialize_pipeline.as_ref(),
3502 *next_stmt_id,
3503 );
3504 }
3505 BuildersOrCallback::Callback(_, node_callback) => {
3506 node_callback(node, next_stmt_id);
3507 }
3508 }
3509
3510 *next_stmt_id += 1;
3511
3512 ident_stack.push(receiver_stream_ident);
3513 }
3514
3515 HydroNode::ExternalInput {
3516 instantiate_fn,
3517 deserialize_fn: deserialize_pipeline,
3518 ..
3519 } => {
3520 let receiver_stream_ident =
3521 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3522
3523 match builders_or_callback {
3524 BuildersOrCallback::Builders(graph_builders) => {
3525 let (_, source_expr) = match instantiate_fn {
3526 DebugInstantiate::Building => (
3527 syn::parse_quote!(DUMMY_SINK),
3528 syn::parse_quote!(DUMMY_SOURCE),
3529 ),
3530
3531 DebugInstantiate::Finalized(finalized) => {
3532 (finalized.sink.clone(), finalized.source.clone())
3533 }
3534 };
3535
3536 graph_builders.create_external_source(
3537 &out_location,
3538 source_expr,
3539 &receiver_stream_ident,
3540 deserialize_pipeline.as_ref(),
3541 *next_stmt_id,
3542 );
3543 }
3544 BuildersOrCallback::Callback(_, node_callback) => {
3545 node_callback(node, next_stmt_id);
3546 }
3547 }
3548
3549 *next_stmt_id += 1;
3550
3551 ident_stack.push(receiver_stream_ident);
3552 }
3553
3554 HydroNode::Counter {
3555 tag,
3556 duration,
3557 prefix,
3558 ..
3559 } => {
3560 let input_ident = ident_stack.pop().unwrap();
3561
3562 let counter_ident =
3563 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3564
3565 match builders_or_callback {
3566 BuildersOrCallback::Builders(graph_builders) => {
3567 let arg = format!("{}({})", prefix, tag);
3568 let builder = graph_builders.get_dfir_mut(&out_location);
3569 builder.add_dfir(
3570 parse_quote! {
3571 #counter_ident = #input_ident -> _counter(#arg, #duration);
3572 },
3573 None,
3574 Some(&next_stmt_id.to_string()),
3575 );
3576 }
3577 BuildersOrCallback::Callback(_, node_callback) => {
3578 node_callback(node, next_stmt_id);
3579 }
3580 }
3581
3582 *next_stmt_id += 1;
3583
3584 ident_stack.push(counter_ident);
3585 }
3586 }
3587 },
3588 seen_tees,
3589 false,
3590 );
3591
3592 ident_stack
3593 .pop()
3594 .expect("ident_stack should have exactly one element after traversal")
3595 }
3596
3597 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3598 match self {
3599 HydroNode::Placeholder => {
3600 panic!()
3601 }
3602 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3603 HydroNode::Source { source, .. } => match source {
3604 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3605 HydroSource::ExternalNetwork()
3606 | HydroSource::Spin()
3607 | HydroSource::ClusterMembers(_, _)
3608 | HydroSource::Embedded(_) => {} },
3610 HydroNode::SingletonSource { value, .. } => {
3611 transform(value);
3612 }
3613 HydroNode::CycleSource { .. }
3614 | HydroNode::Tee { .. }
3615 | HydroNode::YieldConcat { .. }
3616 | HydroNode::BeginAtomic { .. }
3617 | HydroNode::EndAtomic { .. }
3618 | HydroNode::Batch { .. }
3619 | HydroNode::Chain { .. }
3620 | HydroNode::ChainFirst { .. }
3621 | HydroNode::CrossProduct { .. }
3622 | HydroNode::CrossSingleton { .. }
3623 | HydroNode::ResolveFutures { .. }
3624 | HydroNode::ResolveFuturesOrdered { .. }
3625 | HydroNode::Join { .. }
3626 | HydroNode::Difference { .. }
3627 | HydroNode::AntiJoin { .. }
3628 | HydroNode::DeferTick { .. }
3629 | HydroNode::Enumerate { .. }
3630 | HydroNode::Unique { .. }
3631 | HydroNode::Sort { .. } => {}
3632 HydroNode::Map { f, .. }
3633 | HydroNode::FlatMap { f, .. }
3634 | HydroNode::Filter { f, .. }
3635 | HydroNode::FilterMap { f, .. }
3636 | HydroNode::Inspect { f, .. }
3637 | HydroNode::Reduce { f, .. }
3638 | HydroNode::ReduceKeyed { f, .. }
3639 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3640 transform(f);
3641 }
3642 HydroNode::Fold { init, acc, .. }
3643 | HydroNode::Scan { init, acc, .. }
3644 | HydroNode::FoldKeyed { init, acc, .. } => {
3645 transform(init);
3646 transform(acc);
3647 }
3648 HydroNode::Network {
3649 serialize_fn,
3650 deserialize_fn,
3651 ..
3652 } => {
3653 if let Some(serialize_fn) = serialize_fn {
3654 transform(serialize_fn);
3655 }
3656 if let Some(deserialize_fn) = deserialize_fn {
3657 transform(deserialize_fn);
3658 }
3659 }
3660 HydroNode::ExternalInput { deserialize_fn, .. } => {
3661 if let Some(deserialize_fn) = deserialize_fn {
3662 transform(deserialize_fn);
3663 }
3664 }
3665 HydroNode::Counter { duration, .. } => {
3666 transform(duration);
3667 }
3668 }
3669 }
3670
3671 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3672 &self.metadata().op
3673 }
3674
3675 pub fn metadata(&self) -> &HydroIrMetadata {
3676 match self {
3677 HydroNode::Placeholder => {
3678 panic!()
3679 }
3680 HydroNode::Cast { metadata, .. } => metadata,
3681 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3682 HydroNode::Source { metadata, .. } => metadata,
3683 HydroNode::SingletonSource { metadata, .. } => metadata,
3684 HydroNode::CycleSource { metadata, .. } => metadata,
3685 HydroNode::Tee { metadata, .. } => metadata,
3686 HydroNode::YieldConcat { metadata, .. } => metadata,
3687 HydroNode::BeginAtomic { metadata, .. } => metadata,
3688 HydroNode::EndAtomic { metadata, .. } => metadata,
3689 HydroNode::Batch { metadata, .. } => metadata,
3690 HydroNode::Chain { metadata, .. } => metadata,
3691 HydroNode::ChainFirst { metadata, .. } => metadata,
3692 HydroNode::CrossProduct { metadata, .. } => metadata,
3693 HydroNode::CrossSingleton { metadata, .. } => metadata,
3694 HydroNode::Join { metadata, .. } => metadata,
3695 HydroNode::Difference { metadata, .. } => metadata,
3696 HydroNode::AntiJoin { metadata, .. } => metadata,
3697 HydroNode::ResolveFutures { metadata, .. } => metadata,
3698 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3699 HydroNode::Map { metadata, .. } => metadata,
3700 HydroNode::FlatMap { metadata, .. } => metadata,
3701 HydroNode::Filter { metadata, .. } => metadata,
3702 HydroNode::FilterMap { metadata, .. } => metadata,
3703 HydroNode::DeferTick { metadata, .. } => metadata,
3704 HydroNode::Enumerate { metadata, .. } => metadata,
3705 HydroNode::Inspect { metadata, .. } => metadata,
3706 HydroNode::Unique { metadata, .. } => metadata,
3707 HydroNode::Sort { metadata, .. } => metadata,
3708 HydroNode::Scan { metadata, .. } => metadata,
3709 HydroNode::Fold { metadata, .. } => metadata,
3710 HydroNode::FoldKeyed { metadata, .. } => metadata,
3711 HydroNode::Reduce { metadata, .. } => metadata,
3712 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3713 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3714 HydroNode::ExternalInput { metadata, .. } => metadata,
3715 HydroNode::Network { metadata, .. } => metadata,
3716 HydroNode::Counter { metadata, .. } => metadata,
3717 }
3718 }
3719
3720 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3721 &mut self.metadata_mut().op
3722 }
3723
3724 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3725 match self {
3726 HydroNode::Placeholder => {
3727 panic!()
3728 }
3729 HydroNode::Cast { metadata, .. } => metadata,
3730 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3731 HydroNode::Source { metadata, .. } => metadata,
3732 HydroNode::SingletonSource { metadata, .. } => metadata,
3733 HydroNode::CycleSource { metadata, .. } => metadata,
3734 HydroNode::Tee { metadata, .. } => metadata,
3735 HydroNode::YieldConcat { metadata, .. } => metadata,
3736 HydroNode::BeginAtomic { metadata, .. } => metadata,
3737 HydroNode::EndAtomic { metadata, .. } => metadata,
3738 HydroNode::Batch { metadata, .. } => metadata,
3739 HydroNode::Chain { metadata, .. } => metadata,
3740 HydroNode::ChainFirst { metadata, .. } => metadata,
3741 HydroNode::CrossProduct { metadata, .. } => metadata,
3742 HydroNode::CrossSingleton { metadata, .. } => metadata,
3743 HydroNode::Join { metadata, .. } => metadata,
3744 HydroNode::Difference { metadata, .. } => metadata,
3745 HydroNode::AntiJoin { metadata, .. } => metadata,
3746 HydroNode::ResolveFutures { metadata, .. } => metadata,
3747 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3748 HydroNode::Map { metadata, .. } => metadata,
3749 HydroNode::FlatMap { metadata, .. } => metadata,
3750 HydroNode::Filter { metadata, .. } => metadata,
3751 HydroNode::FilterMap { metadata, .. } => metadata,
3752 HydroNode::DeferTick { metadata, .. } => metadata,
3753 HydroNode::Enumerate { metadata, .. } => metadata,
3754 HydroNode::Inspect { metadata, .. } => metadata,
3755 HydroNode::Unique { metadata, .. } => metadata,
3756 HydroNode::Sort { metadata, .. } => metadata,
3757 HydroNode::Scan { metadata, .. } => metadata,
3758 HydroNode::Fold { metadata, .. } => metadata,
3759 HydroNode::FoldKeyed { metadata, .. } => metadata,
3760 HydroNode::Reduce { metadata, .. } => metadata,
3761 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3762 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3763 HydroNode::ExternalInput { metadata, .. } => metadata,
3764 HydroNode::Network { metadata, .. } => metadata,
3765 HydroNode::Counter { metadata, .. } => metadata,
3766 }
3767 }
3768
3769 pub fn input(&self) -> Vec<&HydroNode> {
3770 match self {
3771 HydroNode::Placeholder => {
3772 panic!()
3773 }
3774 HydroNode::Source { .. }
3775 | HydroNode::SingletonSource { .. }
3776 | HydroNode::ExternalInput { .. }
3777 | HydroNode::CycleSource { .. }
3778 | HydroNode::Tee { .. } => {
3779 vec![]
3781 }
3782 HydroNode::Cast { inner, .. }
3783 | HydroNode::ObserveNonDet { inner, .. }
3784 | HydroNode::YieldConcat { inner, .. }
3785 | HydroNode::BeginAtomic { inner, .. }
3786 | HydroNode::EndAtomic { inner, .. }
3787 | HydroNode::Batch { inner, .. } => {
3788 vec![inner]
3789 }
3790 HydroNode::Chain { first, second, .. } => {
3791 vec![first, second]
3792 }
3793 HydroNode::ChainFirst { first, second, .. } => {
3794 vec![first, second]
3795 }
3796 HydroNode::CrossProduct { left, right, .. }
3797 | HydroNode::CrossSingleton { left, right, .. }
3798 | HydroNode::Join { left, right, .. } => {
3799 vec![left, right]
3800 }
3801 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3802 vec![pos, neg]
3803 }
3804 HydroNode::Map { input, .. }
3805 | HydroNode::FlatMap { input, .. }
3806 | HydroNode::Filter { input, .. }
3807 | HydroNode::FilterMap { input, .. }
3808 | HydroNode::Sort { input, .. }
3809 | HydroNode::DeferTick { input, .. }
3810 | HydroNode::Enumerate { input, .. }
3811 | HydroNode::Inspect { input, .. }
3812 | HydroNode::Unique { input, .. }
3813 | HydroNode::Network { input, .. }
3814 | HydroNode::Counter { input, .. }
3815 | HydroNode::ResolveFutures { input, .. }
3816 | HydroNode::ResolveFuturesOrdered { input, .. }
3817 | HydroNode::Fold { input, .. }
3818 | HydroNode::FoldKeyed { input, .. }
3819 | HydroNode::Reduce { input, .. }
3820 | HydroNode::ReduceKeyed { input, .. }
3821 | HydroNode::Scan { input, .. } => {
3822 vec![input]
3823 }
3824 HydroNode::ReduceKeyedWatermark {
3825 input, watermark, ..
3826 } => {
3827 vec![input, watermark]
3828 }
3829 }
3830 }
3831
3832 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3833 self.input()
3834 .iter()
3835 .map(|input_node| input_node.metadata())
3836 .collect()
3837 }
3838
3839 pub fn print_root(&self) -> String {
3840 match self {
3841 HydroNode::Placeholder => {
3842 panic!()
3843 }
3844 HydroNode::Cast { .. } => "Cast()".to_owned(),
3845 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3846 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3847 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3848 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3849 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3850 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3851 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3852 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3853 HydroNode::Batch { .. } => "Batch()".to_owned(),
3854 HydroNode::Chain { first, second, .. } => {
3855 format!("Chain({}, {})", first.print_root(), second.print_root())
3856 }
3857 HydroNode::ChainFirst { first, second, .. } => {
3858 format!(
3859 "ChainFirst({}, {})",
3860 first.print_root(),
3861 second.print_root()
3862 )
3863 }
3864 HydroNode::CrossProduct { left, right, .. } => {
3865 format!(
3866 "CrossProduct({}, {})",
3867 left.print_root(),
3868 right.print_root()
3869 )
3870 }
3871 HydroNode::CrossSingleton { left, right, .. } => {
3872 format!(
3873 "CrossSingleton({}, {})",
3874 left.print_root(),
3875 right.print_root()
3876 )
3877 }
3878 HydroNode::Join { left, right, .. } => {
3879 format!("Join({}, {})", left.print_root(), right.print_root())
3880 }
3881 HydroNode::Difference { pos, neg, .. } => {
3882 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3883 }
3884 HydroNode::AntiJoin { pos, neg, .. } => {
3885 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3886 }
3887 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3888 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3889 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3890 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3891 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3892 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3893 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3894 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3895 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3896 HydroNode::Unique { .. } => "Unique()".to_owned(),
3897 HydroNode::Sort { .. } => "Sort()".to_owned(),
3898 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3899 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3900 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3901 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3902 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3903 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3904 HydroNode::Network { .. } => "Network()".to_owned(),
3905 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3906 HydroNode::Counter { tag, duration, .. } => {
3907 format!("Counter({:?}, {:?})", tag, duration)
3908 }
3909 }
3910 }
3911}
3912
3913#[cfg(feature = "build")]
3914fn instantiate_network<'a, D>(
3915 env: &mut D::InstantiateEnv,
3916 from_location: &LocationId,
3917 to_location: &LocationId,
3918 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3919 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3920 name: Option<&str>,
3921) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3922where
3923 D: Deploy<'a>,
3924{
3925 let ((sink, source), connect_fn) = match (from_location, to_location) {
3926 (&LocationId::Process(from), &LocationId::Process(to)) => {
3927 let from_node = processes
3928 .get(from)
3929 .unwrap_or_else(|| {
3930 panic!("A process used in the graph was not instantiated: {}", from)
3931 })
3932 .clone();
3933 let to_node = processes
3934 .get(to)
3935 .unwrap_or_else(|| {
3936 panic!("A process used in the graph was not instantiated: {}", to)
3937 })
3938 .clone();
3939
3940 let sink_port = from_node.next_port();
3941 let source_port = to_node.next_port();
3942
3943 (
3944 D::o2o_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3945 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3946 )
3947 }
3948 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3949 let from_node = processes
3950 .get(from)
3951 .unwrap_or_else(|| {
3952 panic!("A process used in the graph was not instantiated: {}", from)
3953 })
3954 .clone();
3955 let to_node = clusters
3956 .get(to)
3957 .unwrap_or_else(|| {
3958 panic!("A cluster used in the graph was not instantiated: {}", to)
3959 })
3960 .clone();
3961
3962 let sink_port = from_node.next_port();
3963 let source_port = to_node.next_port();
3964
3965 (
3966 D::o2m_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3967 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3968 )
3969 }
3970 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3971 let from_node = clusters
3972 .get(from)
3973 .unwrap_or_else(|| {
3974 panic!("A cluster used in the graph was not instantiated: {}", from)
3975 })
3976 .clone();
3977 let to_node = processes
3978 .get(to)
3979 .unwrap_or_else(|| {
3980 panic!("A process used in the graph was not instantiated: {}", to)
3981 })
3982 .clone();
3983
3984 let sink_port = from_node.next_port();
3985 let source_port = to_node.next_port();
3986
3987 (
3988 D::m2o_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3989 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3990 )
3991 }
3992 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3993 let from_node = clusters
3994 .get(from)
3995 .unwrap_or_else(|| {
3996 panic!("A cluster used in the graph was not instantiated: {}", from)
3997 })
3998 .clone();
3999 let to_node = clusters
4000 .get(to)
4001 .unwrap_or_else(|| {
4002 panic!("A cluster used in the graph was not instantiated: {}", to)
4003 })
4004 .clone();
4005
4006 let sink_port = from_node.next_port();
4007 let source_port = to_node.next_port();
4008
4009 (
4010 D::m2m_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
4011 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4012 )
4013 }
4014 (LocationId::Tick(_, _), _) => panic!(),
4015 (_, LocationId::Tick(_, _)) => panic!(),
4016 (LocationId::Atomic(_), _) => panic!(),
4017 (_, LocationId::Atomic(_)) => panic!(),
4018 };
4019 (sink, source, connect_fn)
4020}
4021
4022#[cfg(test)]
4023mod test {
4024 use std::mem::size_of;
4025
4026 use stageleft::{QuotedWithContext, q};
4027
4028 use super::*;
4029
4030 #[test]
4031 #[cfg_attr(
4032 not(feature = "build"),
4033 ignore = "expects inclusion of feature-gated fields"
4034 )]
4035 fn hydro_node_size() {
4036 assert_eq!(size_of::<HydroNode>(), 248);
4037 }
4038
4039 #[test]
4040 #[cfg_attr(
4041 not(feature = "build"),
4042 ignore = "expects inclusion of feature-gated fields"
4043 )]
4044 fn hydro_root_size() {
4045 assert_eq!(size_of::<HydroRoot>(), 136);
4046 }
4047
4048 #[test]
4049 fn test_simplify_q_macro_basic() {
4050 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4052 let result = simplify_q_macro(simple_expr.clone());
4053 assert_eq!(result, simple_expr);
4054 }
4055
4056 #[test]
4057 fn test_simplify_q_macro_actual_stageleft_call() {
4058 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4060 let result = simplify_q_macro(stageleft_call);
4061 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4064 }
4065
4066 #[test]
4067 fn test_closure_no_pipe_at_start() {
4068 let stageleft_call = q!({
4070 let foo = 123;
4071 move |b: usize| b + foo
4072 })
4073 .splice_fn1_ctx(&());
4074 let result = simplify_q_macro(stageleft_call);
4075 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4076 }
4077}