Skip to main content

hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34    HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
42///
43/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
44/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
45pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48    /// Map from Cluster location ID to member IDs.
49    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50    type InstantiateEnv = Deployment;
51
52    type Process = DeployNode;
53    type Cluster = DeployCluster;
54    type External = DeployExternal;
55
56    fn o2o_sink_source(
57        _env: &mut Self::InstantiateEnv,
58        _p1: &Self::Process,
59        p1_port: &<Self::Process as Node>::Port,
60        _p2: &Self::Process,
61        p2_port: &<Self::Process as Node>::Port,
62        _name: Option<&str>,
63    ) -> (syn::Expr, syn::Expr) {
64        let p1_port = p1_port.as_str();
65        let p2_port = p2_port.as_str();
66        deploy_o2o(
67            RuntimeData::new("__hydro_lang_trybuild_cli"),
68            p1_port,
69            p2_port,
70        )
71    }
72
73    fn o2o_connect(
74        p1: &Self::Process,
75        p1_port: &<Self::Process as Node>::Port,
76        p2: &Self::Process,
77        p2_port: &<Self::Process as Node>::Port,
78    ) -> Box<dyn FnOnce()> {
79        let p1 = p1.clone();
80        let p1_port = p1_port.clone();
81        let p2 = p2.clone();
82        let p2_port = p2_port.clone();
83
84        Box::new(move || {
85            let self_underlying_borrow = p1.underlying.borrow();
86            let self_underlying = self_underlying_borrow.as_ref().unwrap();
87            let source_port = self_underlying.get_port(p1_port.clone());
88
89            let other_underlying_borrow = p2.underlying.borrow();
90            let other_underlying = other_underlying_borrow.as_ref().unwrap();
91            let recipient_port = other_underlying.get_port(p2_port.clone());
92
93            source_port.send_to(&recipient_port)
94        })
95    }
96
97    fn o2m_sink_source(
98        _env: &mut Self::InstantiateEnv,
99        _p1: &Self::Process,
100        p1_port: &<Self::Process as Node>::Port,
101        _c2: &Self::Cluster,
102        c2_port: &<Self::Cluster as Node>::Port,
103        _name: Option<&str>,
104    ) -> (syn::Expr, syn::Expr) {
105        let p1_port = p1_port.as_str();
106        let c2_port = c2_port.as_str();
107        deploy_o2m(
108            RuntimeData::new("__hydro_lang_trybuild_cli"),
109            p1_port,
110            c2_port,
111        )
112    }
113
114    fn o2m_connect(
115        p1: &Self::Process,
116        p1_port: &<Self::Process as Node>::Port,
117        c2: &Self::Cluster,
118        c2_port: &<Self::Cluster as Node>::Port,
119    ) -> Box<dyn FnOnce()> {
120        let p1 = p1.clone();
121        let p1_port = p1_port.clone();
122        let c2 = c2.clone();
123        let c2_port = c2_port.clone();
124
125        Box::new(move || {
126            let self_underlying_borrow = p1.underlying.borrow();
127            let self_underlying = self_underlying_borrow.as_ref().unwrap();
128            let source_port = self_underlying.get_port(p1_port.clone());
129
130            let recipient_port = DemuxSink {
131                demux: c2
132                    .members
133                    .borrow()
134                    .iter()
135                    .enumerate()
136                    .map(|(id, c)| {
137                        (
138                            id as u32,
139                            Arc::new(c.underlying.get_port(c2_port.clone()))
140                                as Arc<dyn RustCrateSink + 'static>,
141                        )
142                    })
143                    .collect(),
144            };
145
146            source_port.send_to(&recipient_port)
147        })
148    }
149
150    fn m2o_sink_source(
151        _env: &mut Self::InstantiateEnv,
152        _c1: &Self::Cluster,
153        c1_port: &<Self::Cluster as Node>::Port,
154        _p2: &Self::Process,
155        p2_port: &<Self::Process as Node>::Port,
156        _name: Option<&str>,
157    ) -> (syn::Expr, syn::Expr) {
158        let c1_port = c1_port.as_str();
159        let p2_port = p2_port.as_str();
160        deploy_m2o(
161            RuntimeData::new("__hydro_lang_trybuild_cli"),
162            c1_port,
163            p2_port,
164        )
165    }
166
167    fn m2o_connect(
168        c1: &Self::Cluster,
169        c1_port: &<Self::Cluster as Node>::Port,
170        p2: &Self::Process,
171        p2_port: &<Self::Process as Node>::Port,
172    ) -> Box<dyn FnOnce()> {
173        let c1 = c1.clone();
174        let c1_port = c1_port.clone();
175        let p2 = p2.clone();
176        let p2_port = p2_port.clone();
177
178        Box::new(move || {
179            let other_underlying_borrow = p2.underlying.borrow();
180            let other_underlying = other_underlying_borrow.as_ref().unwrap();
181            let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
182
183            for (i, node) in c1.members.borrow().iter().enumerate() {
184                let source_port = node.underlying.get_port(c1_port.clone());
185
186                TaggedSource {
187                    source: Arc::new(source_port),
188                    tag: i as u32,
189                }
190                .send_to(&recipient_port);
191            }
192        })
193    }
194
195    fn m2m_sink_source(
196        _env: &mut Self::InstantiateEnv,
197        _c1: &Self::Cluster,
198        c1_port: &<Self::Cluster as Node>::Port,
199        _c2: &Self::Cluster,
200        c2_port: &<Self::Cluster as Node>::Port,
201        _name: Option<&str>,
202    ) -> (syn::Expr, syn::Expr) {
203        let c1_port = c1_port.as_str();
204        let c2_port = c2_port.as_str();
205        deploy_m2m(
206            RuntimeData::new("__hydro_lang_trybuild_cli"),
207            c1_port,
208            c2_port,
209        )
210    }
211
212    fn m2m_connect(
213        c1: &Self::Cluster,
214        c1_port: &<Self::Cluster as Node>::Port,
215        c2: &Self::Cluster,
216        c2_port: &<Self::Cluster as Node>::Port,
217    ) -> Box<dyn FnOnce()> {
218        let c1 = c1.clone();
219        let c1_port = c1_port.clone();
220        let c2 = c2.clone();
221        let c2_port = c2_port.clone();
222
223        Box::new(move || {
224            for (i, sender) in c1.members.borrow().iter().enumerate() {
225                let source_port = sender.underlying.get_port(c1_port.clone());
226
227                let recipient_port = DemuxSink {
228                    demux: c2
229                        .members
230                        .borrow()
231                        .iter()
232                        .enumerate()
233                        .map(|(id, c)| {
234                            (
235                                id as u32,
236                                Arc::new(c.underlying.get_port(c2_port.clone()).merge())
237                                    as Arc<dyn RustCrateSink + 'static>,
238                            )
239                        })
240                        .collect(),
241                };
242
243                TaggedSource {
244                    source: Arc::new(source_port),
245                    tag: i as u32,
246                }
247                .send_to(&recipient_port);
248            }
249        })
250    }
251
252    fn e2o_many_source(
253        extra_stmts: &mut Vec<syn::Stmt>,
254        _p2: &Self::Process,
255        p2_port: &<Self::Process as Node>::Port,
256        codec_type: &syn::Type,
257        shared_handle: String,
258    ) -> syn::Expr {
259        let connect_ident = syn::Ident::new(
260            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
261            Span::call_site(),
262        );
263        let source_ident = syn::Ident::new(
264            &format!("__hydro_deploy_many_{}_source", &shared_handle),
265            Span::call_site(),
266        );
267        let sink_ident = syn::Ident::new(
268            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
269            Span::call_site(),
270        );
271        let membership_ident = syn::Ident::new(
272            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
273            Span::call_site(),
274        );
275
276        let root = get_this_crate();
277
278        extra_stmts.push(syn::parse_quote! {
279            let #connect_ident = __hydro_lang_trybuild_cli
280                .port(#p2_port)
281                .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
282        });
283
284        extra_stmts.push(syn::parse_quote! {
285            let #source_ident = #connect_ident.source;
286        });
287
288        extra_stmts.push(syn::parse_quote! {
289            let #sink_ident = #connect_ident.sink;
290        });
291
292        extra_stmts.push(syn::parse_quote! {
293            let #membership_ident = #connect_ident.membership;
294        });
295
296        parse_quote!(#source_ident)
297    }
298
299    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
300        let sink_ident = syn::Ident::new(
301            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
302            Span::call_site(),
303        );
304        parse_quote!(#sink_ident)
305    }
306
307    fn e2o_source(
308        extra_stmts: &mut Vec<syn::Stmt>,
309        _p1: &Self::External,
310        _p1_port: &<Self::External as Node>::Port,
311        _p2: &Self::Process,
312        p2_port: &<Self::Process as Node>::Port,
313        codec_type: &syn::Type,
314        shared_handle: String,
315    ) -> syn::Expr {
316        let connect_ident = syn::Ident::new(
317            &format!("__hydro_deploy_{}_connect", &shared_handle),
318            Span::call_site(),
319        );
320        let source_ident = syn::Ident::new(
321            &format!("__hydro_deploy_{}_source", &shared_handle),
322            Span::call_site(),
323        );
324        let sink_ident = syn::Ident::new(
325            &format!("__hydro_deploy_{}_sink", &shared_handle),
326            Span::call_site(),
327        );
328
329        let root = get_this_crate();
330
331        extra_stmts.push(syn::parse_quote! {
332            let #connect_ident = __hydro_lang_trybuild_cli
333                .port(#p2_port)
334                .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
335        });
336
337        extra_stmts.push(syn::parse_quote! {
338            let #source_ident = #connect_ident.source;
339        });
340
341        extra_stmts.push(syn::parse_quote! {
342            let #sink_ident = #connect_ident.sink;
343        });
344
345        parse_quote!(#source_ident)
346    }
347
348    fn e2o_connect(
349        p1: &Self::External,
350        p1_port: &<Self::External as Node>::Port,
351        p2: &Self::Process,
352        p2_port: &<Self::Process as Node>::Port,
353        _many: bool,
354        server_hint: NetworkHint,
355    ) -> Box<dyn FnOnce()> {
356        let p1 = p1.clone();
357        let p1_port = p1_port.clone();
358        let p2 = p2.clone();
359        let p2_port = p2_port.clone();
360
361        Box::new(move || {
362            let self_underlying_borrow = p1.underlying.borrow();
363            let self_underlying = self_underlying_borrow.as_ref().unwrap();
364            let source_port = self_underlying.declare_many_client();
365
366            let other_underlying_borrow = p2.underlying.borrow();
367            let other_underlying = other_underlying_borrow.as_ref().unwrap();
368            let recipient_port = other_underlying.get_port_with_hint(
369                p2_port.clone(),
370                match server_hint {
371                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
372                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
373                },
374            );
375
376            source_port.send_to(&recipient_port);
377
378            p1.client_ports
379                .borrow_mut()
380                .insert(p1_port.clone(), source_port);
381        })
382    }
383
384    fn o2e_sink(
385        _p1: &Self::Process,
386        _p1_port: &<Self::Process as Node>::Port,
387        _p2: &Self::External,
388        _p2_port: &<Self::External as Node>::Port,
389        shared_handle: String,
390    ) -> syn::Expr {
391        let sink_ident = syn::Ident::new(
392            &format!("__hydro_deploy_{}_sink", &shared_handle),
393            Span::call_site(),
394        );
395        parse_quote!(#sink_ident)
396    }
397
398    fn cluster_ids(
399        of_cluster: LocationKey,
400    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
401        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
402    }
403
404    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
405        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
406    }
407
408    fn cluster_membership_stream(
409        _env: &mut Self::InstantiateEnv,
410        _at_location: &LocationId,
411        location_id: &LocationId,
412    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
413    {
414        cluster_membership_stream(location_id)
415    }
416}
417
418#[expect(missing_docs, reason = "TODO")]
419pub trait DeployCrateWrapper {
420    fn underlying(&self) -> Arc<RustCrateService>;
421
422    fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
423        self.underlying().stdout()
424    }
425
426    fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
427        self.underlying().stderr()
428    }
429
430    fn stdout_filter(
431        &self,
432        prefix: impl Into<String>,
433    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
434        self.underlying().stdout_filter(prefix.into())
435    }
436
437    fn stderr_filter(
438        &self,
439        prefix: impl Into<String>,
440    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
441        self.underlying().stderr_filter(prefix.into())
442    }
443}
444
445#[expect(missing_docs, reason = "TODO")]
446#[derive(Clone)]
447pub struct TrybuildHost {
448    host: Arc<dyn Host>,
449    display_name: Option<String>,
450    rustflags: Option<String>,
451    profile: Option<String>,
452    additional_hydro_features: Vec<String>,
453    features: Vec<String>,
454    tracing: Option<TracingOptions>,
455    build_envs: Vec<(String, String)>,
456    name_hint: Option<String>,
457    cluster_idx: Option<usize>,
458}
459
460impl From<Arc<dyn Host>> for TrybuildHost {
461    fn from(host: Arc<dyn Host>) -> Self {
462        Self {
463            host,
464            display_name: None,
465            rustflags: None,
466            profile: None,
467            additional_hydro_features: vec![],
468            features: vec![],
469            tracing: None,
470            build_envs: vec![],
471            name_hint: None,
472            cluster_idx: None,
473        }
474    }
475}
476
477impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
478    fn from(host: Arc<H>) -> Self {
479        Self {
480            host,
481            display_name: None,
482            rustflags: None,
483            profile: None,
484            additional_hydro_features: vec![],
485            features: vec![],
486            tracing: None,
487            build_envs: vec![],
488            name_hint: None,
489            cluster_idx: None,
490        }
491    }
492}
493
494#[expect(missing_docs, reason = "TODO")]
495impl TrybuildHost {
496    pub fn new(host: Arc<dyn Host>) -> Self {
497        Self {
498            host,
499            display_name: None,
500            rustflags: None,
501            profile: None,
502            additional_hydro_features: vec![],
503            features: vec![],
504            tracing: None,
505            build_envs: vec![],
506            name_hint: None,
507            cluster_idx: None,
508        }
509    }
510
511    pub fn display_name(self, display_name: impl Into<String>) -> Self {
512        if self.display_name.is_some() {
513            panic!("{} already set", name_of!(display_name in Self));
514        }
515
516        Self {
517            display_name: Some(display_name.into()),
518            ..self
519        }
520    }
521
522    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
523        if self.rustflags.is_some() {
524            panic!("{} already set", name_of!(rustflags in Self));
525        }
526
527        Self {
528            rustflags: Some(rustflags.into()),
529            ..self
530        }
531    }
532
533    pub fn profile(self, profile: impl Into<String>) -> Self {
534        if self.profile.is_some() {
535            panic!("{} already set", name_of!(profile in Self));
536        }
537
538        Self {
539            profile: Some(profile.into()),
540            ..self
541        }
542    }
543
544    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
545        Self {
546            additional_hydro_features,
547            ..self
548        }
549    }
550
551    pub fn features(self, features: Vec<String>) -> Self {
552        Self {
553            features: self.features.into_iter().chain(features).collect(),
554            ..self
555        }
556    }
557
558    pub fn tracing(self, tracing: TracingOptions) -> Self {
559        if self.tracing.is_some() {
560            panic!("{} already set", name_of!(tracing in Self));
561        }
562
563        Self {
564            tracing: Some(tracing),
565            ..self
566        }
567    }
568
569    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
570        Self {
571            build_envs: self
572                .build_envs
573                .into_iter()
574                .chain(std::iter::once((key.into(), value.into())))
575                .collect(),
576            ..self
577        }
578    }
579}
580
581impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
582    type ProcessSpec = TrybuildHost;
583    fn into_process_spec(self) -> TrybuildHost {
584        TrybuildHost {
585            host: self,
586            display_name: None,
587            rustflags: None,
588            profile: None,
589            additional_hydro_features: vec![],
590            features: vec![],
591            tracing: None,
592            build_envs: vec![],
593            name_hint: None,
594            cluster_idx: None,
595        }
596    }
597}
598
599impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
600    type ProcessSpec = TrybuildHost;
601    fn into_process_spec(self) -> TrybuildHost {
602        TrybuildHost {
603            host: self,
604            display_name: None,
605            rustflags: None,
606            profile: None,
607            additional_hydro_features: vec![],
608            features: vec![],
609            tracing: None,
610            build_envs: vec![],
611            name_hint: None,
612            cluster_idx: None,
613        }
614    }
615}
616
617#[expect(missing_docs, reason = "TODO")]
618#[derive(Clone)]
619pub struct DeployExternal {
620    next_port: Rc<RefCell<usize>>,
621    host: Arc<dyn Host>,
622    underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
623    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
624    allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
625}
626
627impl DeployExternal {
628    pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
629        self.client_ports
630            .borrow()
631            .get(
632                self.allocated_ports
633                    .borrow()
634                    .get(&external_port_id)
635                    .unwrap(),
636            )
637            .unwrap()
638            .clone()
639    }
640}
641
642impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
643    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
644        assert!(
645            self.allocated_ports
646                .borrow_mut()
647                .insert(external_port_id, port.clone())
648                .is_none_or(|old| old == port)
649        );
650    }
651
652    fn as_bytes_bidi(
653        &self,
654        external_port_id: ExternalPortId,
655    ) -> impl Future<
656        Output = (
657            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
658            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
659        ),
660    > + 'a {
661        let port = self.raw_port(external_port_id);
662
663        async move {
664            let (source, sink) = port.connect().await.into_source_sink();
665            (
666                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
667                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
668            )
669        }
670    }
671
672    fn as_bincode_bidi<InT, OutT>(
673        &self,
674        external_port_id: ExternalPortId,
675    ) -> impl Future<
676        Output = (
677            Pin<Box<dyn Stream<Item = OutT>>>,
678            Pin<Box<dyn Sink<InT, Error = Error>>>,
679        ),
680    > + 'a
681    where
682        InT: Serialize + 'static,
683        OutT: DeserializeOwned + 'static,
684    {
685        let port = self.raw_port(external_port_id);
686        async move {
687            let (source, sink) = port.connect().await.into_source_sink();
688            (
689                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
690                    as Pin<Box<dyn Stream<Item = OutT>>>,
691                Box::pin(
692                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
693                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
694            )
695        }
696    }
697
698    fn as_bincode_sink<T: Serialize + 'static>(
699        &self,
700        external_port_id: ExternalPortId,
701    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
702        let port = self.raw_port(external_port_id);
703        async move {
704            let sink = port.connect().await.into_sink();
705            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
706                as Pin<Box<dyn Sink<T, Error = Error>>>
707        }
708    }
709
710    fn as_bincode_source<T: DeserializeOwned + 'static>(
711        &self,
712        external_port_id: ExternalPortId,
713    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
714        let port = self.raw_port(external_port_id);
715        async move {
716            let source = port.connect().await.into_source();
717            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
718                as Pin<Box<dyn Stream<Item = T>>>
719        }
720    }
721}
722
723impl Node for DeployExternal {
724    type Port = String;
725    /// Map from Cluster location ID to member IDs.
726    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
727    type InstantiateEnv = Deployment;
728
729    fn next_port(&self) -> Self::Port {
730        let next_port = *self.next_port.borrow();
731        *self.next_port.borrow_mut() += 1;
732
733        format!("port_{}", next_port)
734    }
735
736    fn instantiate(
737        &self,
738        env: &mut Self::InstantiateEnv,
739        _meta: &mut Self::Meta,
740        _graph: DfirGraph,
741        extra_stmts: &[syn::Stmt],
742        sidecars: &[syn::Expr],
743    ) {
744        assert!(extra_stmts.is_empty());
745        assert!(sidecars.is_empty());
746        let service = env.CustomService(self.host.clone(), vec![]);
747        *self.underlying.borrow_mut() = Some(service);
748    }
749
750    fn update_meta(&self, _meta: &Self::Meta) {}
751}
752
753impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
754    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
755        DeployExternal {
756            next_port: Rc::new(RefCell::new(0)),
757            host: self,
758            underlying: Rc::new(RefCell::new(None)),
759            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
760            client_ports: Rc::new(RefCell::new(HashMap::new())),
761        }
762    }
763}
764
765impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
766    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
767        DeployExternal {
768            next_port: Rc::new(RefCell::new(0)),
769            host: self,
770            underlying: Rc::new(RefCell::new(None)),
771            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
772            client_ports: Rc::new(RefCell::new(HashMap::new())),
773        }
774    }
775}
776
777pub(crate) enum CrateOrTrybuild {
778    Crate(RustCrate, Arc<dyn Host>),
779    Trybuild(TrybuildHost),
780}
781
782#[expect(missing_docs, reason = "TODO")]
783#[derive(Clone)]
784pub struct DeployNode {
785    next_port: Rc<RefCell<usize>>,
786    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
787    underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
788}
789
790impl DeployCrateWrapper for DeployNode {
791    fn underlying(&self) -> Arc<RustCrateService> {
792        Arc::clone(self.underlying.borrow().as_ref().unwrap())
793    }
794}
795
796impl Node for DeployNode {
797    type Port = String;
798    /// Map from Cluster location ID to member IDs.
799    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
800    type InstantiateEnv = Deployment;
801
802    fn next_port(&self) -> String {
803        let next_port = *self.next_port.borrow();
804        *self.next_port.borrow_mut() += 1;
805
806        format!("port_{}", next_port)
807    }
808
809    fn update_meta(&self, meta: &Self::Meta) {
810        let underlying_node = self.underlying.borrow();
811        underlying_node.as_ref().unwrap().update_meta(HydroMeta {
812            clusters: meta.clone(),
813            cluster_id: None,
814        });
815    }
816
817    fn instantiate(
818        &self,
819        env: &mut Self::InstantiateEnv,
820        _meta: &mut Self::Meta,
821        graph: DfirGraph,
822        extra_stmts: &[syn::Stmt],
823        sidecars: &[syn::Expr],
824    ) {
825        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
826            CrateOrTrybuild::Crate(c, host) => (c, host),
827            CrateOrTrybuild::Trybuild(trybuild) => {
828                // Determine linking mode based on host target type
829                let linking_mode = if !cfg!(target_os = "windows")
830                    && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
831                {
832                    // When compiling for local, prefer dynamic linking to reduce binary size
833                    // Windows is currently not supported due to https://github.com/bevyengine/bevy/pull/2016
834                    LinkingMode::Dynamic
835                } else {
836                    LinkingMode::Static
837                };
838                let (bin_name, config) = create_graph_trybuild(
839                    graph,
840                    extra_stmts,
841                    sidecars,
842                    trybuild.name_hint.as_deref(),
843                    crate::compile::trybuild::generate::DeployMode::HydroDeploy,
844                    linking_mode,
845                );
846                let host = trybuild.host.clone();
847                (
848                    create_trybuild_service(
849                        trybuild,
850                        &config.project_dir,
851                        &config.target_dir,
852                        config.features.as_deref(),
853                        &bin_name,
854                        &config.linking_mode,
855                    ),
856                    host,
857                )
858            }
859        };
860
861        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
862    }
863}
864
865#[expect(missing_docs, reason = "TODO")]
866#[derive(Clone)]
867pub struct DeployClusterNode {
868    underlying: Arc<RustCrateService>,
869}
870
871impl DeployCrateWrapper for DeployClusterNode {
872    fn underlying(&self) -> Arc<RustCrateService> {
873        self.underlying.clone()
874    }
875}
876#[expect(missing_docs, reason = "TODO")]
877#[derive(Clone)]
878pub struct DeployCluster {
879    key: LocationKey,
880    next_port: Rc<RefCell<usize>>,
881    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
882    members: Rc<RefCell<Vec<DeployClusterNode>>>,
883    name_hint: Option<String>,
884}
885
886impl DeployCluster {
887    #[expect(missing_docs, reason = "TODO")]
888    pub fn members(&self) -> Vec<DeployClusterNode> {
889        self.members.borrow().clone()
890    }
891}
892
893impl Node for DeployCluster {
894    type Port = String;
895    /// Map from Cluster location ID to member IDs.
896    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
897    type InstantiateEnv = Deployment;
898
899    fn next_port(&self) -> String {
900        let next_port = *self.next_port.borrow();
901        *self.next_port.borrow_mut() += 1;
902
903        format!("port_{}", next_port)
904    }
905
906    fn instantiate(
907        &self,
908        env: &mut Self::InstantiateEnv,
909        meta: &mut Self::Meta,
910        graph: DfirGraph,
911        extra_stmts: &[syn::Stmt],
912        sidecars: &[syn::Expr],
913    ) {
914        let has_trybuild = self
915            .cluster_spec
916            .borrow()
917            .as_ref()
918            .unwrap()
919            .iter()
920            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
921
922        // For clusters, use static linking if ANY host is non-local (conservative approach)
923        let linking_mode = if !cfg!(target_os = "windows")
924            && self
925                .cluster_spec
926                .borrow()
927                .as_ref()
928                .unwrap()
929                .iter()
930                .all(|spec| match spec {
931                    CrateOrTrybuild::Crate(_, _) => true, // crates handle their own linking
932                    CrateOrTrybuild::Trybuild(t) => {
933                        t.host.target_type() == hydro_deploy::HostTargetType::Local
934                    }
935                }) {
936            // See comment above for Windows exception
937            LinkingMode::Dynamic
938        } else {
939            LinkingMode::Static
940        };
941
942        let maybe_trybuild = if has_trybuild {
943            Some(create_graph_trybuild(
944                graph,
945                extra_stmts,
946                sidecars,
947                self.name_hint.as_deref(),
948                crate::compile::trybuild::generate::DeployMode::HydroDeploy,
949                linking_mode,
950            ))
951        } else {
952            None
953        };
954
955        let cluster_nodes = self
956            .cluster_spec
957            .borrow_mut()
958            .take()
959            .unwrap()
960            .into_iter()
961            .map(|spec| {
962                let (service, host) = match spec {
963                    CrateOrTrybuild::Crate(c, host) => (c, host),
964                    CrateOrTrybuild::Trybuild(trybuild) => {
965                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
966                        let host = trybuild.host.clone();
967                        (
968                            create_trybuild_service(
969                                trybuild,
970                                &config.project_dir,
971                                &config.target_dir,
972                                config.features.as_deref(),
973                                bin_name,
974                                &config.linking_mode,
975                            ),
976                            host,
977                        )
978                    }
979                };
980
981                env.add_service(service, host)
982            })
983            .collect::<Vec<_>>();
984        meta.insert(
985            self.key,
986            (0..(cluster_nodes.len() as u32))
987                .map(TaglessMemberId::from_raw_id)
988                .collect(),
989        );
990        *self.members.borrow_mut() = cluster_nodes
991            .into_iter()
992            .map(|n| DeployClusterNode { underlying: n })
993            .collect();
994    }
995
996    fn update_meta(&self, meta: &Self::Meta) {
997        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
998            node.underlying.update_meta(HydroMeta {
999                clusters: meta.clone(),
1000                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
1001            });
1002        }
1003    }
1004}
1005
1006#[expect(missing_docs, reason = "TODO")]
1007#[derive(Clone)]
1008pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
1009
1010impl DeployProcessSpec {
1011    #[expect(missing_docs, reason = "TODO")]
1012    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1013        Self(t, host)
1014    }
1015}
1016
1017impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1018    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1019        DeployNode {
1020            next_port: Rc::new(RefCell::new(0)),
1021            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1022            underlying: Rc::new(RefCell::new(None)),
1023        }
1024    }
1025}
1026
1027impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1028    fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1029        self.name_hint = Some(format!("{} (process {})", name_hint, key));
1030        DeployNode {
1031            next_port: Rc::new(RefCell::new(0)),
1032            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1033            underlying: Rc::new(RefCell::new(None)),
1034        }
1035    }
1036}
1037
1038#[expect(missing_docs, reason = "TODO")]
1039#[derive(Clone)]
1040pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1041
1042impl DeployClusterSpec {
1043    #[expect(missing_docs, reason = "TODO")]
1044    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1045        Self(crates)
1046    }
1047}
1048
1049impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1050    fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1051        DeployCluster {
1052            key,
1053            next_port: Rc::new(RefCell::new(0)),
1054            cluster_spec: Rc::new(RefCell::new(Some(
1055                self.0
1056                    .into_iter()
1057                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1058                    .collect(),
1059            ))),
1060            members: Rc::new(RefCell::new(vec![])),
1061            name_hint: None,
1062        }
1063    }
1064}
1065
1066impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1067    fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1068        let name_hint = format!("{} (cluster {})", name_hint, key);
1069        DeployCluster {
1070            key,
1071            next_port: Rc::new(RefCell::new(0)),
1072            cluster_spec: Rc::new(RefCell::new(Some(
1073                self.into_iter()
1074                    .enumerate()
1075                    .map(|(idx, b)| {
1076                        let mut b = b.into();
1077                        b.name_hint = Some(name_hint.clone());
1078                        b.cluster_idx = Some(idx);
1079                        CrateOrTrybuild::Trybuild(b)
1080                    })
1081                    .collect(),
1082            ))),
1083            members: Rc::new(RefCell::new(vec![])),
1084            name_hint: Some(name_hint),
1085        }
1086    }
1087}
1088
1089fn create_trybuild_service(
1090    trybuild: TrybuildHost,
1091    dir: &std::path::Path,
1092    target_dir: &std::path::PathBuf,
1093    features: Option<&[String]>,
1094    bin_name: &str,
1095    linking_mode: &LinkingMode,
1096) -> RustCrate {
1097    // For dynamic linking, use the dylib-examples crate; for static, use the base crate
1098    let crate_dir = match linking_mode {
1099        LinkingMode::Dynamic => dir.join("dylib-examples"),
1100        LinkingMode::Static => dir.to_path_buf(),
1101    };
1102
1103    let mut ret = RustCrate::new(&crate_dir, dir)
1104        .target_dir(target_dir)
1105        .example(bin_name)
1106        .no_default_features();
1107
1108    ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1109
1110    if let Some(display_name) = trybuild.display_name {
1111        ret = ret.display_name(display_name);
1112    } else if let Some(name_hint) = trybuild.name_hint {
1113        if let Some(cluster_idx) = trybuild.cluster_idx {
1114            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1115        } else {
1116            ret = ret.display_name(name_hint);
1117        }
1118    }
1119
1120    if let Some(rustflags) = trybuild.rustflags {
1121        ret = ret.rustflags(rustflags);
1122    }
1123
1124    if let Some(profile) = trybuild.profile {
1125        ret = ret.profile(profile);
1126    }
1127
1128    if let Some(tracing) = trybuild.tracing {
1129        ret = ret.tracing(tracing);
1130    }
1131
1132    ret = ret.features(
1133        vec!["hydro___feature_deploy_integration".to_owned()]
1134            .into_iter()
1135            .chain(
1136                trybuild
1137                    .additional_hydro_features
1138                    .into_iter()
1139                    .map(|runtime_feature| {
1140                        assert!(
1141                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1142                            "{runtime_feature} is not a valid Hydro runtime feature"
1143                        );
1144                        format!("hydro___feature_{runtime_feature}")
1145                    }),
1146            )
1147            .chain(trybuild.features),
1148    );
1149
1150    for (key, value) in trybuild.build_envs {
1151        ret = ret.build_env(key, value);
1152    }
1153
1154    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1155    ret = ret.config("build.incremental = false");
1156
1157    if let Some(features) = features {
1158        ret = ret.features(features);
1159    }
1160
1161    ret
1162}