Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Networking
11//!
12//! Process-to-process (o2o) networking is supported. When a location has network
13//! sends or receives, the generated function takes additional `network_out` and
14//! `network_in` parameters whose types are generated structs with one field per
15//! network port (named after the channel). Network channels must be named via
16//! `.name()` on the networking config.
17//!
18//! - Sinks (`EmbeddedNetworkOut`): one `FnMut(Bytes)` field per outgoing channel.
19//! - Sources (`EmbeddedNetworkIn`): one `Stream<Item = Result<BytesMut, io::Error>>`
20//!   field per incoming channel.
21//!
22//! The caller is responsible for wiring these together (e.g. via in-memory channels,
23//! sockets, etc.). Cluster networking and external ports are not supported.
24
25use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46/// Marker type for the embedded deployment backend.
47///
48/// All networking methods panic — this backend only supports pure local computation.
49pub enum EmbeddedDeploy {}
50
51/// A trivial node type for embedded deployment. Stores a user-provided function name.
52#[derive(Clone)]
53pub struct EmbeddedNode {
54    /// The function name to use in the generated code for this location.
55    pub fn_name: String,
56    /// The location key for this node, used to register network ports.
57    pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61    type Port = ();
62    type Meta = ();
63    type InstantiateEnv = EmbeddedInstantiateEnv;
64
65    fn next_port(&self) -> Self::Port {}
66
67    fn update_meta(&self, _meta: &Self::Meta) {}
68
69    fn instantiate(
70        &self,
71        _env: &mut Self::InstantiateEnv,
72        _meta: &mut Self::Meta,
73        _graph: DfirGraph,
74        _extra_stmts: &[syn::Stmt],
75        _sidecars: &[syn::Expr],
76    ) {
77        // No-op: embedded mode doesn't instantiate nodes at deploy time.
78    }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83        panic!("EmbeddedDeploy does not support external ports");
84    }
85
86    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87    fn as_bytes_bidi(
88        &self,
89        _external_port_id: ExternalPortId,
90    ) -> impl Future<
91        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92    > + 'a {
93        async { panic!("EmbeddedDeploy does not support external ports") }
94    }
95
96    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97    fn as_bincode_bidi<InT, OutT>(
98        &self,
99        _external_port_id: ExternalPortId,
100    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101    where
102        InT: Serialize + 'static,
103        OutT: DeserializeOwned + 'static,
104    {
105        async { panic!("EmbeddedDeploy does not support external ports") }
106    }
107
108    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109    fn as_bincode_sink<T>(
110        &self,
111        _external_port_id: ExternalPortId,
112    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113    where
114        T: Serialize + 'static,
115    {
116        async { panic!("EmbeddedDeploy does not support external ports") }
117    }
118
119    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120    fn as_bincode_source<T>(
121        &self,
122        _external_port_id: ExternalPortId,
123    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124    where
125        T: DeserializeOwned + 'static,
126    {
127        async { panic!("EmbeddedDeploy does not support external ports") }
128    }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133        EmbeddedNode {
134            fn_name: self.into(),
135            location_key,
136        }
137    }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142        EmbeddedNode {
143            fn_name: self.into(),
144            location_key,
145        }
146    }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150    fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151        EmbeddedNode {
152            fn_name: self.into(),
153            location_key,
154        }
155    }
156}
157
158/// Collected embedded input/output registrations, keyed by location.
159///
160/// During `compile_network`, each `HydroSource::Embedded` and `HydroRoot::EmbeddedOutput`
161/// IR node registers its ident, element type, and location key here.
162/// `generate_embedded` then uses this to add the appropriate parameters
163/// to each generated function.
164#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166    /// (ident name, element type) pairs per location key, for inputs.
167    pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168    /// (ident name, element type) pairs per location key, for outputs.
169    pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170    /// Network output port names per location key (sender side of channels).
171    /// Each entry is (port_name, is_tagged) where is_tagged means the type is (TaglessMemberId, Bytes).
172    pub network_outputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
173    /// Network input port names per location key (receiver side of channels).
174    /// Each entry is (port_name, is_tagged) where is_tagged means the type is Result<(TaglessMemberId, BytesMut), Error>.
175    pub network_inputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
176    /// Cluster membership streams needed per location key.
177    /// Maps location_key -> vec of cluster LocationKeys whose membership is needed.
178    pub membership_streams: SparseSecondaryMap<LocationKey, Vec<LocationKey>>,
179}
180
181impl<'a> Deploy<'a> for EmbeddedDeploy {
182    type Meta = ();
183    type InstantiateEnv = EmbeddedInstantiateEnv;
184
185    type Process = EmbeddedNode;
186    type Cluster = EmbeddedNode;
187    type External = EmbeddedNode;
188
189    fn o2o_sink_source(
190        env: &mut Self::InstantiateEnv,
191        p1: &Self::Process,
192        _p1_port: &(),
193        p2: &Self::Process,
194        _p2_port: &(),
195        name: Option<&str>,
196    ) -> (syn::Expr, syn::Expr) {
197        let name = name.expect(
198            "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
199        );
200
201        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
202        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
203
204        env.network_outputs
205            .entry(p1.location_key)
206            .unwrap()
207            .or_default()
208            .push((name.to_owned(), false));
209        env.network_inputs
210            .entry(p2.location_key)
211            .unwrap()
212            .or_default()
213            .push((name.to_owned(), false));
214
215        (
216            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
217            syn::parse_quote!(#source_ident),
218        )
219    }
220
221    fn o2o_connect(
222        _p1: &Self::Process,
223        _p1_port: &(),
224        _p2: &Self::Process,
225        _p2_port: &(),
226    ) -> Box<dyn FnOnce()> {
227        Box::new(|| {})
228    }
229
230    fn o2m_sink_source(
231        env: &mut Self::InstantiateEnv,
232        p1: &Self::Process,
233        _p1_port: &(),
234        c2: &Self::Cluster,
235        _c2_port: &(),
236        name: Option<&str>,
237    ) -> (syn::Expr, syn::Expr) {
238        let name = name.expect("EmbeddedDeploy o2m networking requires a channel name.");
239        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
240        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
241        env.network_outputs
242            .entry(p1.location_key)
243            .unwrap()
244            .or_default()
245            .push((name.to_owned(), true));
246        env.network_inputs
247            .entry(c2.location_key)
248            .unwrap()
249            .or_default()
250            .push((name.to_owned(), false));
251        (
252            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
253            syn::parse_quote!(#source_ident),
254        )
255    }
256
257    fn o2m_connect(
258        _p1: &Self::Process,
259        _p1_port: &(),
260        _c2: &Self::Cluster,
261        _c2_port: &(),
262    ) -> Box<dyn FnOnce()> {
263        Box::new(|| {})
264    }
265
266    fn m2o_sink_source(
267        env: &mut Self::InstantiateEnv,
268        c1: &Self::Cluster,
269        _c1_port: &(),
270        p2: &Self::Process,
271        _p2_port: &(),
272        name: Option<&str>,
273    ) -> (syn::Expr, syn::Expr) {
274        let name = name.expect("EmbeddedDeploy m2o networking requires a channel name.");
275        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
276        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
277        env.network_outputs
278            .entry(c1.location_key)
279            .unwrap()
280            .or_default()
281            .push((name.to_owned(), false));
282        env.network_inputs
283            .entry(p2.location_key)
284            .unwrap()
285            .or_default()
286            .push((name.to_owned(), true));
287        (
288            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
289            syn::parse_quote!(#source_ident),
290        )
291    }
292
293    fn m2o_connect(
294        _c1: &Self::Cluster,
295        _c1_port: &(),
296        _p2: &Self::Process,
297        _p2_port: &(),
298    ) -> Box<dyn FnOnce()> {
299        Box::new(|| {})
300    }
301
302    fn m2m_sink_source(
303        env: &mut Self::InstantiateEnv,
304        c1: &Self::Cluster,
305        _c1_port: &(),
306        c2: &Self::Cluster,
307        _c2_port: &(),
308        name: Option<&str>,
309    ) -> (syn::Expr, syn::Expr) {
310        let name = name.expect("EmbeddedDeploy m2m networking requires a channel name.");
311        let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
312        let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
313        env.network_outputs
314            .entry(c1.location_key)
315            .unwrap()
316            .or_default()
317            .push((name.to_owned(), true));
318        env.network_inputs
319            .entry(c2.location_key)
320            .unwrap()
321            .or_default()
322            .push((name.to_owned(), true));
323        (
324            syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
325            syn::parse_quote!(#source_ident),
326        )
327    }
328
329    fn m2m_connect(
330        _c1: &Self::Cluster,
331        _c1_port: &(),
332        _c2: &Self::Cluster,
333        _c2_port: &(),
334    ) -> Box<dyn FnOnce()> {
335        Box::new(|| {})
336    }
337
338    fn e2o_many_source(
339        _extra_stmts: &mut Vec<syn::Stmt>,
340        _p2: &Self::Process,
341        _p2_port: &(),
342        _codec_type: &syn::Type,
343        _shared_handle: String,
344    ) -> syn::Expr {
345        panic!("EmbeddedDeploy does not support networking (e2o)")
346    }
347
348    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
349        panic!("EmbeddedDeploy does not support networking (e2o)")
350    }
351
352    fn e2o_source(
353        _extra_stmts: &mut Vec<syn::Stmt>,
354        _p1: &Self::External,
355        _p1_port: &(),
356        _p2: &Self::Process,
357        _p2_port: &(),
358        _codec_type: &syn::Type,
359        _shared_handle: String,
360    ) -> syn::Expr {
361        panic!("EmbeddedDeploy does not support networking (e2o)")
362    }
363
364    fn e2o_connect(
365        _p1: &Self::External,
366        _p1_port: &(),
367        _p2: &Self::Process,
368        _p2_port: &(),
369        _many: bool,
370        _server_hint: NetworkHint,
371    ) -> Box<dyn FnOnce()> {
372        panic!("EmbeddedDeploy does not support networking (e2o)")
373    }
374
375    fn o2e_sink(
376        _p1: &Self::Process,
377        _p1_port: &(),
378        _p2: &Self::External,
379        _p2_port: &(),
380        _shared_handle: String,
381    ) -> syn::Expr {
382        panic!("EmbeddedDeploy does not support networking (o2e)")
383    }
384
385    #[expect(
386        unreachable_code,
387        reason = "panic before q! which is only for return type"
388    )]
389    fn cluster_ids(
390        _of_cluster: LocationKey,
391    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
392        panic!("EmbeddedDeploy does not support cluster IDs");
393        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
394    }
395
396    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
397        super::embedded_runtime::embedded_cluster_self_id()
398    }
399
400    fn cluster_membership_stream(
401        env: &mut Self::InstantiateEnv,
402        at_location: &LocationId,
403        location_id: &LocationId,
404    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
405    {
406        let at_key = match at_location {
407            LocationId::Process(key) | LocationId::Cluster(key) => *key,
408            _ => panic!("cluster_membership_stream must be called from a process or cluster"),
409        };
410        let cluster_key = match location_id {
411            LocationId::Cluster(key) => *key,
412            _ => panic!("cluster_membership_stream target must be a cluster"),
413        };
414        let vec = env.membership_streams.entry(at_key).unwrap().or_default();
415        let idx = if let Some(pos) = vec.iter().position(|k| *k == cluster_key) {
416            pos
417        } else {
418            vec.push(cluster_key);
419            vec.len() - 1
420        };
421
422        super::embedded_runtime::embedded_cluster_membership_stream(idx)
423    }
424
425    fn register_embedded_input(
426        env: &mut Self::InstantiateEnv,
427        location_key: LocationKey,
428        ident: &syn::Ident,
429        element_type: &syn::Type,
430    ) {
431        env.inputs
432            .entry(location_key)
433            .unwrap()
434            .or_default()
435            .push((ident.clone(), element_type.clone()));
436    }
437
438    fn register_embedded_output(
439        env: &mut Self::InstantiateEnv,
440        location_key: LocationKey,
441        ident: &syn::Ident,
442        element_type: &syn::Type,
443    ) {
444        env.outputs
445            .entry(location_key)
446            .unwrap()
447            .or_default()
448            .push((ident.clone(), element_type.clone()));
449    }
450}
451
452impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
453    /// Generates a `syn::File` containing one function per location in the flow.
454    ///
455    /// Each generated function has the signature:
456    /// ```ignore
457    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
458    /// ```
459    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
460    ///
461    /// The returned `Dfir` can be manually executed by the caller.
462    ///
463    /// # Arguments
464    ///
465    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
466    ///   re-exports). Hyphens will be replaced with underscores.
467    ///
468    /// # Usage
469    ///
470    /// Typically called from a `build.rs` in a wrapper crate:
471    /// ```ignore
472    /// // build.rs
473    /// let deploy = flow.with_process(&process, "my_fn".to_string());
474    /// let code = deploy.generate_embedded("my_hydro_crate");
475    /// let out_dir = std::env::var("OUT_DIR").unwrap();
476    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
477    /// ```
478    ///
479    /// Then in `lib.rs`:
480    /// ```ignore
481    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
482    /// ```
483    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
484        let mut env = EmbeddedInstantiateEnv::default();
485        let compiled = self.compile_internal(&mut env);
486
487        let root = crate::staging_util::get_this_crate();
488        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
489
490        let mut items: Vec<syn::Item> = Vec::new();
491
492        // Sort location keys for deterministic output.
493        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
494        location_keys.sort();
495
496        // Build a map from location key to fn_name for lookups.
497        let fn_names: SparseSecondaryMap<LocationKey, &str> = location_keys
498            .iter()
499            .map(|&k| {
500                let name = self
501                    .processes
502                    .get(k)
503                    .map(|n| n.fn_name.as_str())
504                    .or_else(|| self.clusters.get(k).map(|n| n.fn_name.as_str()))
505                    .or_else(|| self.externals.get(k).map(|n| n.fn_name.as_str()))
506                    .expect("location key not found in any node map");
507                (k, name)
508            })
509            .collect();
510
511        for location_key in location_keys {
512            let graph = &compiled.all_dfir()[location_key];
513
514            // Get the user-provided function name from the node.
515            let fn_name = fn_names[location_key];
516            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
517
518            // Get inputs for this location, sorted by name.
519            let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
520            loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
521
522            // Get outputs for this location, sorted by name.
523            let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
524            loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
525
526            let mut diagnostics = Diagnostics::new();
527            let dfir_tokens = graph
528                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
529                .expect("DFIR code generation failed with diagnostics.");
530
531            // --- Build module items (cluster info, output struct, network structs) ---
532            let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
533            let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
534            let mut cluster_params: Vec<proc_macro2::TokenStream> = Vec::new();
535            let mut output_params: Vec<proc_macro2::TokenStream> = Vec::new();
536            let mut net_out_params: Vec<proc_macro2::TokenStream> = Vec::new();
537            let mut net_in_params: Vec<proc_macro2::TokenStream> = Vec::new();
538            let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
539
540            // For cluster locations, add self_id parameter.
541            if self.clusters.contains_key(location_key) {
542                cluster_params.push(quote! {
543                    __cluster_self_id: &'a #root::location::member_id::TaglessMemberId
544                });
545                // Alias to the name the generated DFIR code expects.
546                let self_id_ident = syn::Ident::new(
547                    &format!("__hydro_lang_cluster_self_id_{}", location_key),
548                    Span::call_site(),
549                );
550                extra_destructure.push(quote! {
551                    let #self_id_ident = __cluster_self_id;
552                });
553            }
554
555            // For any location that needs cluster membership streams, add parameters.
556            if let Some(loc_memberships) = env.membership_streams.get(location_key) {
557                let membership_struct_ident =
558                    syn::Ident::new("EmbeddedMembershipStreams", Span::call_site());
559
560                let mem_generic_idents: Vec<syn::Ident> = loc_memberships
561                    .iter()
562                    .enumerate()
563                    .map(|(i, _)| quote::format_ident!("__Mem{}", i))
564                    .collect();
565
566                let mem_field_names: Vec<syn::Ident> = loc_memberships
567                    .iter()
568                    .map(|k| {
569                        let cluster_fn_name = fn_names[*k];
570                        syn::Ident::new(cluster_fn_name, Span::call_site())
571                    })
572                    .collect();
573
574                let struct_fields: Vec<proc_macro2::TokenStream> = mem_field_names
575                    .iter()
576                    .zip(mem_generic_idents.iter())
577                    .map(|(field, generic)| {
578                        quote! { pub #field: #generic }
579                    })
580                    .collect();
581
582                let struct_generics: Vec<proc_macro2::TokenStream> = mem_generic_idents
583                    .iter()
584                    .map(|generic| {
585                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin }
586                    })
587                    .collect();
588
589                for generic in &mem_generic_idents {
590                    extra_fn_generics.push(
591                        quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin + 'a },
592                    );
593                }
594
595                cluster_params.push(quote! {
596                    __membership: #fn_ident::#membership_struct_ident<#(#mem_generic_idents),*>
597                });
598
599                for (i, field) in mem_field_names.iter().enumerate() {
600                    let var_ident =
601                        syn::Ident::new(&format!("__membership_{}", i), Span::call_site());
602                    extra_destructure.push(quote! {
603                        let #var_ident = __membership.#field;
604                    });
605                }
606
607                mod_items.push(quote! {
608                    pub struct #membership_struct_ident<#(#struct_generics),*> {
609                        #(#struct_fields),*
610                    }
611                });
612            }
613
614            // Embedded inputs (Stream sources).
615            let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
616                .iter()
617                .map(|(ident, element_type)| {
618                    quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
619                })
620                .collect();
621
622            // Embedded outputs (FnMut callbacks).
623            if !loc_outputs.is_empty() {
624                let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
625
626                let output_generic_idents: Vec<syn::Ident> = loc_outputs
627                    .iter()
628                    .enumerate()
629                    .map(|(i, _)| quote::format_ident!("__Out{}", i))
630                    .collect();
631
632                let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
633                    .iter()
634                    .zip(output_generic_idents.iter())
635                    .map(|((ident, _), generic)| {
636                        quote! { pub #ident: #generic }
637                    })
638                    .collect();
639
640                let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
641                    .iter()
642                    .zip(output_generic_idents.iter())
643                    .map(|((_, element_type), generic)| {
644                        quote! { #generic: FnMut(#element_type) }
645                    })
646                    .collect();
647
648                for ((_, element_type), generic) in
649                    loc_outputs.iter().zip(output_generic_idents.iter())
650                {
651                    extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
652                }
653
654                output_params.push(quote! {
655                    __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
656                });
657
658                for (ident, _) in &loc_outputs {
659                    extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
660                }
661
662                mod_items.push(quote! {
663                    pub struct #output_struct_ident<#(#struct_generics),*> {
664                        #(#struct_fields),*
665                    }
666                });
667            }
668
669            // Network outputs (FnMut sinks).
670            if let Some(mut loc_net_outputs) = env.network_outputs.remove(location_key) {
671                loc_net_outputs.sort();
672
673                let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
674
675                let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
676                    .iter()
677                    .enumerate()
678                    .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
679                    .collect();
680
681                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
682                    .iter()
683                    .zip(net_out_generic_idents.iter())
684                    .map(|((name, _), generic)| {
685                        let field_ident = syn::Ident::new(name, Span::call_site());
686                        quote! { pub #field_ident: #generic }
687                    })
688                    .collect();
689
690                let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_outputs
691                    .iter()
692                    .zip(net_out_generic_idents.iter())
693                    .map(|((_, is_tagged), generic)| {
694                        if *is_tagged {
695                            quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) }
696                        } else {
697                            quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
698                        }
699                    })
700                    .collect();
701
702                for ((_, is_tagged), generic) in
703                    loc_net_outputs.iter().zip(net_out_generic_idents.iter())
704                {
705                    if *is_tagged {
706                        extra_fn_generics.push(
707                            quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) + 'a },
708                        );
709                    } else {
710                        extra_fn_generics.push(
711                            quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
712                        );
713                    }
714                }
715
716                net_out_params.push(quote! {
717                    __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
718                });
719
720                for (name, _) in &loc_net_outputs {
721                    let field_ident = syn::Ident::new(name, Span::call_site());
722                    let var_ident =
723                        syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
724                    extra_destructure
725                        .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
726                }
727
728                mod_items.push(quote! {
729                    pub struct #net_out_struct_ident<#(#struct_generics),*> {
730                        #(#struct_fields),*
731                    }
732                });
733            }
734
735            // Network inputs (Stream sources).
736            if let Some(mut loc_net_inputs) = env.network_inputs.remove(location_key) {
737                loc_net_inputs.sort();
738
739                let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
740
741                let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
742                    .iter()
743                    .enumerate()
744                    .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
745                    .collect();
746
747                let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
748                    .iter()
749                    .zip(net_in_generic_idents.iter())
750                    .map(|((name, _), generic)| {
751                        let field_ident = syn::Ident::new(name, Span::call_site());
752                        quote! { pub #field_ident: #generic }
753                    })
754                    .collect();
755
756                let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_inputs
757                    .iter()
758                    .zip(net_in_generic_idents.iter())
759                    .map(|((_, is_tagged), generic)| {
760                        if *is_tagged {
761                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin }
762                        } else {
763                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
764                        }
765                    })
766                    .collect();
767
768                for ((_, is_tagged), generic) in
769                    loc_net_inputs.iter().zip(net_in_generic_idents.iter())
770                {
771                    if *is_tagged {
772                        extra_fn_generics.push(
773                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin + 'a },
774                        );
775                    } else {
776                        extra_fn_generics.push(
777                            quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
778                        );
779                    }
780                }
781
782                net_in_params.push(quote! {
783                    __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
784                });
785
786                for (name, _) in &loc_net_inputs {
787                    let field_ident = syn::Ident::new(name, Span::call_site());
788                    let var_ident =
789                        syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
790                    extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
791                }
792
793                mod_items.push(quote! {
794                    pub struct #net_in_struct_ident<#(#struct_generics),*> {
795                        #(#struct_fields),*
796                    }
797                });
798            }
799
800            // Emit the module if there are any structs.
801            if !mod_items.is_empty() {
802                let output_mod: syn::Item = syn::parse_quote! {
803                    pub mod #fn_ident {
804                        use super::*;
805                        #(#mod_items)*
806                    }
807                };
808                items.push(output_mod);
809            }
810
811            // Build the function.
812            let all_params: Vec<proc_macro2::TokenStream> = cluster_params
813                .into_iter()
814                .chain(input_params)
815                .chain(output_params)
816                .chain(net_in_params)
817                .chain(net_out_params)
818                .collect();
819
820            let func = if !extra_fn_generics.is_empty() {
821                syn::parse_quote! {
822                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
823                    pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
824                        #(#extra_destructure)*
825                        #dfir_tokens
826                    }
827                }
828            } else {
829                syn::parse_quote! {
830                    #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
831                    pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
832                        #dfir_tokens
833                    }
834                }
835            };
836
837            items.push(func);
838        }
839
840        syn::parse_quote! {
841            use #orig_crate_name::__staged::__deps::*;
842            use #root::prelude::*;
843            use #root::runtime_support::dfir_rs as __root_dfir_rs;
844            pub use #orig_crate_name::__staged;
845
846            #( #items )*
847        }
848    }
849}