1use std::future::Future;
26use std::io::Error;
27use std::pin::Pin;
28
29use bytes::{Bytes, BytesMut};
30use dfir_lang::diagnostic::Diagnostics;
31use dfir_lang::graph::DfirGraph;
32use futures::{Sink, Stream};
33use proc_macro2::Span;
34use quote::quote;
35use serde::Serialize;
36use serde::de::DeserializeOwned;
37use slotmap::SparseSecondaryMap;
38use stageleft::{QuotedWithContext, q};
39
40use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
41use crate::compile::builder::ExternalPortId;
42use crate::location::dynamic::LocationId;
43use crate::location::member_id::TaglessMemberId;
44use crate::location::{LocationKey, MembershipEvent, NetworkHint};
45
46pub enum EmbeddedDeploy {}
50
51#[derive(Clone)]
53pub struct EmbeddedNode {
54 pub fn_name: String,
56 pub location_key: LocationKey,
58}
59
60impl Node for EmbeddedNode {
61 type Port = ();
62 type Meta = ();
63 type InstantiateEnv = EmbeddedInstantiateEnv;
64
65 fn next_port(&self) -> Self::Port {}
66
67 fn update_meta(&self, _meta: &Self::Meta) {}
68
69 fn instantiate(
70 &self,
71 _env: &mut Self::InstantiateEnv,
72 _meta: &mut Self::Meta,
73 _graph: DfirGraph,
74 _extra_stmts: &[syn::Stmt],
75 _sidecars: &[syn::Expr],
76 ) {
77 }
79}
80
81impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
82 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
83 panic!("EmbeddedDeploy does not support external ports");
84 }
85
86 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
87 fn as_bytes_bidi(
88 &self,
89 _external_port_id: ExternalPortId,
90 ) -> impl Future<
91 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
92 > + 'a {
93 async { panic!("EmbeddedDeploy does not support external ports") }
94 }
95
96 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97 fn as_bincode_bidi<InT, OutT>(
98 &self,
99 _external_port_id: ExternalPortId,
100 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
101 where
102 InT: Serialize + 'static,
103 OutT: DeserializeOwned + 'static,
104 {
105 async { panic!("EmbeddedDeploy does not support external ports") }
106 }
107
108 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
109 fn as_bincode_sink<T>(
110 &self,
111 _external_port_id: ExternalPortId,
112 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
113 where
114 T: Serialize + 'static,
115 {
116 async { panic!("EmbeddedDeploy does not support external ports") }
117 }
118
119 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
120 fn as_bincode_source<T>(
121 &self,
122 _external_port_id: ExternalPortId,
123 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
124 where
125 T: DeserializeOwned + 'static,
126 {
127 async { panic!("EmbeddedDeploy does not support external ports") }
128 }
129}
130
131impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
132 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
133 EmbeddedNode {
134 fn_name: self.into(),
135 location_key,
136 }
137 }
138}
139
140impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
141 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
142 EmbeddedNode {
143 fn_name: self.into(),
144 location_key,
145 }
146 }
147}
148
149impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
150 fn build(self, location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
151 EmbeddedNode {
152 fn_name: self.into(),
153 location_key,
154 }
155 }
156}
157
158#[derive(Default)]
165pub struct EmbeddedInstantiateEnv {
166 pub inputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
168 pub outputs: SparseSecondaryMap<LocationKey, Vec<(syn::Ident, syn::Type)>>,
170 pub network_outputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
173 pub network_inputs: SparseSecondaryMap<LocationKey, Vec<(String, bool)>>,
176 pub membership_streams: SparseSecondaryMap<LocationKey, Vec<LocationKey>>,
179}
180
181impl<'a> Deploy<'a> for EmbeddedDeploy {
182 type Meta = ();
183 type InstantiateEnv = EmbeddedInstantiateEnv;
184
185 type Process = EmbeddedNode;
186 type Cluster = EmbeddedNode;
187 type External = EmbeddedNode;
188
189 fn o2o_sink_source(
190 env: &mut Self::InstantiateEnv,
191 p1: &Self::Process,
192 _p1_port: &(),
193 p2: &Self::Process,
194 _p2_port: &(),
195 name: Option<&str>,
196 ) -> (syn::Expr, syn::Expr) {
197 let name = name.expect(
198 "EmbeddedDeploy o2o networking requires a channel name. Use `TCP.name(\"my_channel\")` to provide one.",
199 );
200
201 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
202 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
203
204 env.network_outputs
205 .entry(p1.location_key)
206 .unwrap()
207 .or_default()
208 .push((name.to_owned(), false));
209 env.network_inputs
210 .entry(p2.location_key)
211 .unwrap()
212 .or_default()
213 .push((name.to_owned(), false));
214
215 (
216 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
217 syn::parse_quote!(#source_ident),
218 )
219 }
220
221 fn o2o_connect(
222 _p1: &Self::Process,
223 _p1_port: &(),
224 _p2: &Self::Process,
225 _p2_port: &(),
226 ) -> Box<dyn FnOnce()> {
227 Box::new(|| {})
228 }
229
230 fn o2m_sink_source(
231 env: &mut Self::InstantiateEnv,
232 p1: &Self::Process,
233 _p1_port: &(),
234 c2: &Self::Cluster,
235 _c2_port: &(),
236 name: Option<&str>,
237 ) -> (syn::Expr, syn::Expr) {
238 let name = name.expect("EmbeddedDeploy o2m networking requires a channel name.");
239 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
240 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
241 env.network_outputs
242 .entry(p1.location_key)
243 .unwrap()
244 .or_default()
245 .push((name.to_owned(), true));
246 env.network_inputs
247 .entry(c2.location_key)
248 .unwrap()
249 .or_default()
250 .push((name.to_owned(), false));
251 (
252 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
253 syn::parse_quote!(#source_ident),
254 )
255 }
256
257 fn o2m_connect(
258 _p1: &Self::Process,
259 _p1_port: &(),
260 _c2: &Self::Cluster,
261 _c2_port: &(),
262 ) -> Box<dyn FnOnce()> {
263 Box::new(|| {})
264 }
265
266 fn m2o_sink_source(
267 env: &mut Self::InstantiateEnv,
268 c1: &Self::Cluster,
269 _c1_port: &(),
270 p2: &Self::Process,
271 _p2_port: &(),
272 name: Option<&str>,
273 ) -> (syn::Expr, syn::Expr) {
274 let name = name.expect("EmbeddedDeploy m2o networking requires a channel name.");
275 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
276 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
277 env.network_outputs
278 .entry(c1.location_key)
279 .unwrap()
280 .or_default()
281 .push((name.to_owned(), false));
282 env.network_inputs
283 .entry(p2.location_key)
284 .unwrap()
285 .or_default()
286 .push((name.to_owned(), true));
287 (
288 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
289 syn::parse_quote!(#source_ident),
290 )
291 }
292
293 fn m2o_connect(
294 _c1: &Self::Cluster,
295 _c1_port: &(),
296 _p2: &Self::Process,
297 _p2_port: &(),
298 ) -> Box<dyn FnOnce()> {
299 Box::new(|| {})
300 }
301
302 fn m2m_sink_source(
303 env: &mut Self::InstantiateEnv,
304 c1: &Self::Cluster,
305 _c1_port: &(),
306 c2: &Self::Cluster,
307 _c2_port: &(),
308 name: Option<&str>,
309 ) -> (syn::Expr, syn::Expr) {
310 let name = name.expect("EmbeddedDeploy m2m networking requires a channel name.");
311 let sink_ident = syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
312 let source_ident = syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
313 env.network_outputs
314 .entry(c1.location_key)
315 .unwrap()
316 .or_default()
317 .push((name.to_owned(), true));
318 env.network_inputs
319 .entry(c2.location_key)
320 .unwrap()
321 .or_default()
322 .push((name.to_owned(), true));
323 (
324 syn::parse_quote!(__root_dfir_rs::sinktools::for_each(#sink_ident)),
325 syn::parse_quote!(#source_ident),
326 )
327 }
328
329 fn m2m_connect(
330 _c1: &Self::Cluster,
331 _c1_port: &(),
332 _c2: &Self::Cluster,
333 _c2_port: &(),
334 ) -> Box<dyn FnOnce()> {
335 Box::new(|| {})
336 }
337
338 fn e2o_many_source(
339 _extra_stmts: &mut Vec<syn::Stmt>,
340 _p2: &Self::Process,
341 _p2_port: &(),
342 _codec_type: &syn::Type,
343 _shared_handle: String,
344 ) -> syn::Expr {
345 panic!("EmbeddedDeploy does not support networking (e2o)")
346 }
347
348 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
349 panic!("EmbeddedDeploy does not support networking (e2o)")
350 }
351
352 fn e2o_source(
353 _extra_stmts: &mut Vec<syn::Stmt>,
354 _p1: &Self::External,
355 _p1_port: &(),
356 _p2: &Self::Process,
357 _p2_port: &(),
358 _codec_type: &syn::Type,
359 _shared_handle: String,
360 ) -> syn::Expr {
361 panic!("EmbeddedDeploy does not support networking (e2o)")
362 }
363
364 fn e2o_connect(
365 _p1: &Self::External,
366 _p1_port: &(),
367 _p2: &Self::Process,
368 _p2_port: &(),
369 _many: bool,
370 _server_hint: NetworkHint,
371 ) -> Box<dyn FnOnce()> {
372 panic!("EmbeddedDeploy does not support networking (e2o)")
373 }
374
375 fn o2e_sink(
376 _p1: &Self::Process,
377 _p1_port: &(),
378 _p2: &Self::External,
379 _p2_port: &(),
380 _shared_handle: String,
381 ) -> syn::Expr {
382 panic!("EmbeddedDeploy does not support networking (o2e)")
383 }
384
385 #[expect(
386 unreachable_code,
387 reason = "panic before q! which is only for return type"
388 )]
389 fn cluster_ids(
390 _of_cluster: LocationKey,
391 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
392 panic!("EmbeddedDeploy does not support cluster IDs");
393 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
394 }
395
396 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
397 super::embedded_runtime::embedded_cluster_self_id()
398 }
399
400 fn cluster_membership_stream(
401 env: &mut Self::InstantiateEnv,
402 at_location: &LocationId,
403 location_id: &LocationId,
404 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
405 {
406 let at_key = match at_location {
407 LocationId::Process(key) | LocationId::Cluster(key) => *key,
408 _ => panic!("cluster_membership_stream must be called from a process or cluster"),
409 };
410 let cluster_key = match location_id {
411 LocationId::Cluster(key) => *key,
412 _ => panic!("cluster_membership_stream target must be a cluster"),
413 };
414 let vec = env.membership_streams.entry(at_key).unwrap().or_default();
415 let idx = if let Some(pos) = vec.iter().position(|k| *k == cluster_key) {
416 pos
417 } else {
418 vec.push(cluster_key);
419 vec.len() - 1
420 };
421
422 super::embedded_runtime::embedded_cluster_membership_stream(idx)
423 }
424
425 fn register_embedded_input(
426 env: &mut Self::InstantiateEnv,
427 location_key: LocationKey,
428 ident: &syn::Ident,
429 element_type: &syn::Type,
430 ) {
431 env.inputs
432 .entry(location_key)
433 .unwrap()
434 .or_default()
435 .push((ident.clone(), element_type.clone()));
436 }
437
438 fn register_embedded_output(
439 env: &mut Self::InstantiateEnv,
440 location_key: LocationKey,
441 ident: &syn::Ident,
442 element_type: &syn::Type,
443 ) {
444 env.outputs
445 .entry(location_key)
446 .unwrap()
447 .or_default()
448 .push((ident.clone(), element_type.clone()));
449 }
450}
451
452impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
453 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
484 let mut env = EmbeddedInstantiateEnv::default();
485 let compiled = self.compile_internal(&mut env);
486
487 let root = crate::staging_util::get_this_crate();
488 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
489
490 let mut items: Vec<syn::Item> = Vec::new();
491
492 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
494 location_keys.sort();
495
496 let fn_names: SparseSecondaryMap<LocationKey, &str> = location_keys
498 .iter()
499 .map(|&k| {
500 let name = self
501 .processes
502 .get(k)
503 .map(|n| n.fn_name.as_str())
504 .or_else(|| self.clusters.get(k).map(|n| n.fn_name.as_str()))
505 .or_else(|| self.externals.get(k).map(|n| n.fn_name.as_str()))
506 .expect("location key not found in any node map");
507 (k, name)
508 })
509 .collect();
510
511 for location_key in location_keys {
512 let graph = &compiled.all_dfir()[location_key];
513
514 let fn_name = fn_names[location_key];
516 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
517
518 let mut loc_inputs = env.inputs.get(location_key).cloned().unwrap_or_default();
520 loc_inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
521
522 let mut loc_outputs = env.outputs.get(location_key).cloned().unwrap_or_default();
524 loc_outputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
525
526 let mut diagnostics = Diagnostics::new();
527 let dfir_tokens = graph
528 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
529 .expect("DFIR code generation failed with diagnostics.");
530
531 let mut mod_items: Vec<proc_macro2::TokenStream> = Vec::new();
533 let mut extra_fn_generics: Vec<proc_macro2::TokenStream> = Vec::new();
534 let mut cluster_params: Vec<proc_macro2::TokenStream> = Vec::new();
535 let mut output_params: Vec<proc_macro2::TokenStream> = Vec::new();
536 let mut net_out_params: Vec<proc_macro2::TokenStream> = Vec::new();
537 let mut net_in_params: Vec<proc_macro2::TokenStream> = Vec::new();
538 let mut extra_destructure: Vec<proc_macro2::TokenStream> = Vec::new();
539
540 if self.clusters.contains_key(location_key) {
542 cluster_params.push(quote! {
543 __cluster_self_id: &'a #root::location::member_id::TaglessMemberId
544 });
545 let self_id_ident = syn::Ident::new(
547 &format!("__hydro_lang_cluster_self_id_{}", location_key),
548 Span::call_site(),
549 );
550 extra_destructure.push(quote! {
551 let #self_id_ident = __cluster_self_id;
552 });
553 }
554
555 if let Some(loc_memberships) = env.membership_streams.get(location_key) {
557 let membership_struct_ident =
558 syn::Ident::new("EmbeddedMembershipStreams", Span::call_site());
559
560 let mem_generic_idents: Vec<syn::Ident> = loc_memberships
561 .iter()
562 .enumerate()
563 .map(|(i, _)| quote::format_ident!("__Mem{}", i))
564 .collect();
565
566 let mem_field_names: Vec<syn::Ident> = loc_memberships
567 .iter()
568 .map(|k| {
569 let cluster_fn_name = fn_names[*k];
570 syn::Ident::new(cluster_fn_name, Span::call_site())
571 })
572 .collect();
573
574 let struct_fields: Vec<proc_macro2::TokenStream> = mem_field_names
575 .iter()
576 .zip(mem_generic_idents.iter())
577 .map(|(field, generic)| {
578 quote! { pub #field: #generic }
579 })
580 .collect();
581
582 let struct_generics: Vec<proc_macro2::TokenStream> = mem_generic_idents
583 .iter()
584 .map(|generic| {
585 quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin }
586 })
587 .collect();
588
589 for generic in &mem_generic_idents {
590 extra_fn_generics.push(
591 quote! { #generic: __root_dfir_rs::futures::Stream<Item = (#root::location::member_id::TaglessMemberId, #root::location::MembershipEvent)> + Unpin + 'a },
592 );
593 }
594
595 cluster_params.push(quote! {
596 __membership: #fn_ident::#membership_struct_ident<#(#mem_generic_idents),*>
597 });
598
599 for (i, field) in mem_field_names.iter().enumerate() {
600 let var_ident =
601 syn::Ident::new(&format!("__membership_{}", i), Span::call_site());
602 extra_destructure.push(quote! {
603 let #var_ident = __membership.#field;
604 });
605 }
606
607 mod_items.push(quote! {
608 pub struct #membership_struct_ident<#(#struct_generics),*> {
609 #(#struct_fields),*
610 }
611 });
612 }
613
614 let input_params: Vec<proc_macro2::TokenStream> = loc_inputs
616 .iter()
617 .map(|(ident, element_type)| {
618 quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
619 })
620 .collect();
621
622 if !loc_outputs.is_empty() {
624 let output_struct_ident = syn::Ident::new("EmbeddedOutputs", Span::call_site());
625
626 let output_generic_idents: Vec<syn::Ident> = loc_outputs
627 .iter()
628 .enumerate()
629 .map(|(i, _)| quote::format_ident!("__Out{}", i))
630 .collect();
631
632 let struct_fields: Vec<proc_macro2::TokenStream> = loc_outputs
633 .iter()
634 .zip(output_generic_idents.iter())
635 .map(|((ident, _), generic)| {
636 quote! { pub #ident: #generic }
637 })
638 .collect();
639
640 let struct_generics: Vec<proc_macro2::TokenStream> = loc_outputs
641 .iter()
642 .zip(output_generic_idents.iter())
643 .map(|((_, element_type), generic)| {
644 quote! { #generic: FnMut(#element_type) }
645 })
646 .collect();
647
648 for ((_, element_type), generic) in
649 loc_outputs.iter().zip(output_generic_idents.iter())
650 {
651 extra_fn_generics.push(quote! { #generic: FnMut(#element_type) + 'a });
652 }
653
654 output_params.push(quote! {
655 __outputs: &'a mut #fn_ident::#output_struct_ident<#(#output_generic_idents),*>
656 });
657
658 for (ident, _) in &loc_outputs {
659 extra_destructure.push(quote! { let mut #ident = &mut __outputs.#ident; });
660 }
661
662 mod_items.push(quote! {
663 pub struct #output_struct_ident<#(#struct_generics),*> {
664 #(#struct_fields),*
665 }
666 });
667 }
668
669 if let Some(mut loc_net_outputs) = env.network_outputs.remove(location_key) {
671 loc_net_outputs.sort();
672
673 let net_out_struct_ident = syn::Ident::new("EmbeddedNetworkOut", Span::call_site());
674
675 let net_out_generic_idents: Vec<syn::Ident> = loc_net_outputs
676 .iter()
677 .enumerate()
678 .map(|(i, _)| quote::format_ident!("__NetOut{}", i))
679 .collect();
680
681 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_outputs
682 .iter()
683 .zip(net_out_generic_idents.iter())
684 .map(|((name, _), generic)| {
685 let field_ident = syn::Ident::new(name, Span::call_site());
686 quote! { pub #field_ident: #generic }
687 })
688 .collect();
689
690 let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_outputs
691 .iter()
692 .zip(net_out_generic_idents.iter())
693 .map(|((_, is_tagged), generic)| {
694 if *is_tagged {
695 quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) }
696 } else {
697 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) }
698 }
699 })
700 .collect();
701
702 for ((_, is_tagged), generic) in
703 loc_net_outputs.iter().zip(net_out_generic_idents.iter())
704 {
705 if *is_tagged {
706 extra_fn_generics.push(
707 quote! { #generic: FnMut((#root::location::member_id::TaglessMemberId, #root::runtime_support::dfir_rs::bytes::Bytes)) + 'a },
708 );
709 } else {
710 extra_fn_generics.push(
711 quote! { #generic: FnMut(#root::runtime_support::dfir_rs::bytes::Bytes) + 'a },
712 );
713 }
714 }
715
716 net_out_params.push(quote! {
717 __network_out: &'a mut #fn_ident::#net_out_struct_ident<#(#net_out_generic_idents),*>
718 });
719
720 for (name, _) in &loc_net_outputs {
721 let field_ident = syn::Ident::new(name, Span::call_site());
722 let var_ident =
723 syn::Ident::new(&format!("__network_out_{name}"), Span::call_site());
724 extra_destructure
725 .push(quote! { let mut #var_ident = &mut __network_out.#field_ident; });
726 }
727
728 mod_items.push(quote! {
729 pub struct #net_out_struct_ident<#(#struct_generics),*> {
730 #(#struct_fields),*
731 }
732 });
733 }
734
735 if let Some(mut loc_net_inputs) = env.network_inputs.remove(location_key) {
737 loc_net_inputs.sort();
738
739 let net_in_struct_ident = syn::Ident::new("EmbeddedNetworkIn", Span::call_site());
740
741 let net_in_generic_idents: Vec<syn::Ident> = loc_net_inputs
742 .iter()
743 .enumerate()
744 .map(|(i, _)| quote::format_ident!("__NetIn{}", i))
745 .collect();
746
747 let struct_fields: Vec<proc_macro2::TokenStream> = loc_net_inputs
748 .iter()
749 .zip(net_in_generic_idents.iter())
750 .map(|((name, _), generic)| {
751 let field_ident = syn::Ident::new(name, Span::call_site());
752 quote! { pub #field_ident: #generic }
753 })
754 .collect();
755
756 let struct_generics: Vec<proc_macro2::TokenStream> = loc_net_inputs
757 .iter()
758 .zip(net_in_generic_idents.iter())
759 .map(|((_, is_tagged), generic)| {
760 if *is_tagged {
761 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin }
762 } else {
763 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin }
764 }
765 })
766 .collect();
767
768 for ((_, is_tagged), generic) in
769 loc_net_inputs.iter().zip(net_in_generic_idents.iter())
770 {
771 if *is_tagged {
772 extra_fn_generics.push(
773 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<(#root::location::member_id::TaglessMemberId, __root_dfir_rs::bytes::BytesMut), std::io::Error>> + Unpin + 'a },
774 );
775 } else {
776 extra_fn_generics.push(
777 quote! { #generic: __root_dfir_rs::futures::Stream<Item = Result<__root_dfir_rs::bytes::BytesMut, std::io::Error>> + Unpin + 'a },
778 );
779 }
780 }
781
782 net_in_params.push(quote! {
783 __network_in: #fn_ident::#net_in_struct_ident<#(#net_in_generic_idents),*>
784 });
785
786 for (name, _) in &loc_net_inputs {
787 let field_ident = syn::Ident::new(name, Span::call_site());
788 let var_ident =
789 syn::Ident::new(&format!("__network_in_{name}"), Span::call_site());
790 extra_destructure.push(quote! { let #var_ident = __network_in.#field_ident; });
791 }
792
793 mod_items.push(quote! {
794 pub struct #net_in_struct_ident<#(#struct_generics),*> {
795 #(#struct_fields),*
796 }
797 });
798 }
799
800 if !mod_items.is_empty() {
802 let output_mod: syn::Item = syn::parse_quote! {
803 pub mod #fn_ident {
804 use super::*;
805 #(#mod_items)*
806 }
807 };
808 items.push(output_mod);
809 }
810
811 let all_params: Vec<proc_macro2::TokenStream> = cluster_params
813 .into_iter()
814 .chain(input_params)
815 .chain(output_params)
816 .chain(net_in_params)
817 .chain(net_out_params)
818 .collect();
819
820 let func = if !extra_fn_generics.is_empty() {
821 syn::parse_quote! {
822 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
823 pub fn #fn_ident<'a, #(#extra_fn_generics),*>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
824 #(#extra_destructure)*
825 #dfir_tokens
826 }
827 }
828 } else {
829 syn::parse_quote! {
830 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
831 pub fn #fn_ident<'a>(#(#all_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
832 #dfir_tokens
833 }
834 }
835 };
836
837 items.push(func);
838 }
839
840 syn::parse_quote! {
841 use #orig_crate_name::__staged::__deps::*;
842 use #root::prelude::*;
843 use #root::runtime_support::dfir_rs as __root_dfir_rs;
844 pub use #orig_crate_name::__staged;
845
846 #( #items )*
847 }
848 }
849}