1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34 HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50 type InstantiateEnv = Deployment;
51
52 type Process = DeployNode;
53 type Cluster = DeployCluster;
54 type External = DeployExternal;
55
56 fn o2o_sink_source(
57 _env: &mut Self::InstantiateEnv,
58 _p1: &Self::Process,
59 p1_port: &<Self::Process as Node>::Port,
60 _p2: &Self::Process,
61 p2_port: &<Self::Process as Node>::Port,
62 _name: Option<&str>,
63 ) -> (syn::Expr, syn::Expr) {
64 let p1_port = p1_port.as_str();
65 let p2_port = p2_port.as_str();
66 deploy_o2o(
67 RuntimeData::new("__hydro_lang_trybuild_cli"),
68 p1_port,
69 p2_port,
70 )
71 }
72
73 fn o2o_connect(
74 p1: &Self::Process,
75 p1_port: &<Self::Process as Node>::Port,
76 p2: &Self::Process,
77 p2_port: &<Self::Process as Node>::Port,
78 ) -> Box<dyn FnOnce()> {
79 let p1 = p1.clone();
80 let p1_port = p1_port.clone();
81 let p2 = p2.clone();
82 let p2_port = p2_port.clone();
83
84 Box::new(move || {
85 let self_underlying_borrow = p1.underlying.borrow();
86 let self_underlying = self_underlying_borrow.as_ref().unwrap();
87 let source_port = self_underlying.get_port(p1_port.clone());
88
89 let other_underlying_borrow = p2.underlying.borrow();
90 let other_underlying = other_underlying_borrow.as_ref().unwrap();
91 let recipient_port = other_underlying.get_port(p2_port.clone());
92
93 source_port.send_to(&recipient_port)
94 })
95 }
96
97 fn o2m_sink_source(
98 _env: &mut Self::InstantiateEnv,
99 _p1: &Self::Process,
100 p1_port: &<Self::Process as Node>::Port,
101 _c2: &Self::Cluster,
102 c2_port: &<Self::Cluster as Node>::Port,
103 _name: Option<&str>,
104 ) -> (syn::Expr, syn::Expr) {
105 let p1_port = p1_port.as_str();
106 let c2_port = c2_port.as_str();
107 deploy_o2m(
108 RuntimeData::new("__hydro_lang_trybuild_cli"),
109 p1_port,
110 c2_port,
111 )
112 }
113
114 fn o2m_connect(
115 p1: &Self::Process,
116 p1_port: &<Self::Process as Node>::Port,
117 c2: &Self::Cluster,
118 c2_port: &<Self::Cluster as Node>::Port,
119 ) -> Box<dyn FnOnce()> {
120 let p1 = p1.clone();
121 let p1_port = p1_port.clone();
122 let c2 = c2.clone();
123 let c2_port = c2_port.clone();
124
125 Box::new(move || {
126 let self_underlying_borrow = p1.underlying.borrow();
127 let self_underlying = self_underlying_borrow.as_ref().unwrap();
128 let source_port = self_underlying.get_port(p1_port.clone());
129
130 let recipient_port = DemuxSink {
131 demux: c2
132 .members
133 .borrow()
134 .iter()
135 .enumerate()
136 .map(|(id, c)| {
137 (
138 id as u32,
139 Arc::new(c.underlying.get_port(c2_port.clone()))
140 as Arc<dyn RustCrateSink + 'static>,
141 )
142 })
143 .collect(),
144 };
145
146 source_port.send_to(&recipient_port)
147 })
148 }
149
150 fn m2o_sink_source(
151 _env: &mut Self::InstantiateEnv,
152 _c1: &Self::Cluster,
153 c1_port: &<Self::Cluster as Node>::Port,
154 _p2: &Self::Process,
155 p2_port: &<Self::Process as Node>::Port,
156 _name: Option<&str>,
157 ) -> (syn::Expr, syn::Expr) {
158 let c1_port = c1_port.as_str();
159 let p2_port = p2_port.as_str();
160 deploy_m2o(
161 RuntimeData::new("__hydro_lang_trybuild_cli"),
162 c1_port,
163 p2_port,
164 )
165 }
166
167 fn m2o_connect(
168 c1: &Self::Cluster,
169 c1_port: &<Self::Cluster as Node>::Port,
170 p2: &Self::Process,
171 p2_port: &<Self::Process as Node>::Port,
172 ) -> Box<dyn FnOnce()> {
173 let c1 = c1.clone();
174 let c1_port = c1_port.clone();
175 let p2 = p2.clone();
176 let p2_port = p2_port.clone();
177
178 Box::new(move || {
179 let other_underlying_borrow = p2.underlying.borrow();
180 let other_underlying = other_underlying_borrow.as_ref().unwrap();
181 let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
182
183 for (i, node) in c1.members.borrow().iter().enumerate() {
184 let source_port = node.underlying.get_port(c1_port.clone());
185
186 TaggedSource {
187 source: Arc::new(source_port),
188 tag: i as u32,
189 }
190 .send_to(&recipient_port);
191 }
192 })
193 }
194
195 fn m2m_sink_source(
196 _env: &mut Self::InstantiateEnv,
197 _c1: &Self::Cluster,
198 c1_port: &<Self::Cluster as Node>::Port,
199 _c2: &Self::Cluster,
200 c2_port: &<Self::Cluster as Node>::Port,
201 _name: Option<&str>,
202 ) -> (syn::Expr, syn::Expr) {
203 let c1_port = c1_port.as_str();
204 let c2_port = c2_port.as_str();
205 deploy_m2m(
206 RuntimeData::new("__hydro_lang_trybuild_cli"),
207 c1_port,
208 c2_port,
209 )
210 }
211
212 fn m2m_connect(
213 c1: &Self::Cluster,
214 c1_port: &<Self::Cluster as Node>::Port,
215 c2: &Self::Cluster,
216 c2_port: &<Self::Cluster as Node>::Port,
217 ) -> Box<dyn FnOnce()> {
218 let c1 = c1.clone();
219 let c1_port = c1_port.clone();
220 let c2 = c2.clone();
221 let c2_port = c2_port.clone();
222
223 Box::new(move || {
224 for (i, sender) in c1.members.borrow().iter().enumerate() {
225 let source_port = sender.underlying.get_port(c1_port.clone());
226
227 let recipient_port = DemuxSink {
228 demux: c2
229 .members
230 .borrow()
231 .iter()
232 .enumerate()
233 .map(|(id, c)| {
234 (
235 id as u32,
236 Arc::new(c.underlying.get_port(c2_port.clone()).merge())
237 as Arc<dyn RustCrateSink + 'static>,
238 )
239 })
240 .collect(),
241 };
242
243 TaggedSource {
244 source: Arc::new(source_port),
245 tag: i as u32,
246 }
247 .send_to(&recipient_port);
248 }
249 })
250 }
251
252 fn e2o_many_source(
253 extra_stmts: &mut Vec<syn::Stmt>,
254 _p2: &Self::Process,
255 p2_port: &<Self::Process as Node>::Port,
256 codec_type: &syn::Type,
257 shared_handle: String,
258 ) -> syn::Expr {
259 let connect_ident = syn::Ident::new(
260 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
261 Span::call_site(),
262 );
263 let source_ident = syn::Ident::new(
264 &format!("__hydro_deploy_many_{}_source", &shared_handle),
265 Span::call_site(),
266 );
267 let sink_ident = syn::Ident::new(
268 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
269 Span::call_site(),
270 );
271 let membership_ident = syn::Ident::new(
272 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
273 Span::call_site(),
274 );
275
276 let root = get_this_crate();
277
278 extra_stmts.push(syn::parse_quote! {
279 let #connect_ident = __hydro_lang_trybuild_cli
280 .port(#p2_port)
281 .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
282 });
283
284 extra_stmts.push(syn::parse_quote! {
285 let #source_ident = #connect_ident.source;
286 });
287
288 extra_stmts.push(syn::parse_quote! {
289 let #sink_ident = #connect_ident.sink;
290 });
291
292 extra_stmts.push(syn::parse_quote! {
293 let #membership_ident = #connect_ident.membership;
294 });
295
296 parse_quote!(#source_ident)
297 }
298
299 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
300 let sink_ident = syn::Ident::new(
301 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
302 Span::call_site(),
303 );
304 parse_quote!(#sink_ident)
305 }
306
307 fn e2o_source(
308 extra_stmts: &mut Vec<syn::Stmt>,
309 _p1: &Self::External,
310 _p1_port: &<Self::External as Node>::Port,
311 _p2: &Self::Process,
312 p2_port: &<Self::Process as Node>::Port,
313 codec_type: &syn::Type,
314 shared_handle: String,
315 ) -> syn::Expr {
316 let connect_ident = syn::Ident::new(
317 &format!("__hydro_deploy_{}_connect", &shared_handle),
318 Span::call_site(),
319 );
320 let source_ident = syn::Ident::new(
321 &format!("__hydro_deploy_{}_source", &shared_handle),
322 Span::call_site(),
323 );
324 let sink_ident = syn::Ident::new(
325 &format!("__hydro_deploy_{}_sink", &shared_handle),
326 Span::call_site(),
327 );
328
329 let root = get_this_crate();
330
331 extra_stmts.push(syn::parse_quote! {
332 let #connect_ident = __hydro_lang_trybuild_cli
333 .port(#p2_port)
334 .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
335 });
336
337 extra_stmts.push(syn::parse_quote! {
338 let #source_ident = #connect_ident.source;
339 });
340
341 extra_stmts.push(syn::parse_quote! {
342 let #sink_ident = #connect_ident.sink;
343 });
344
345 parse_quote!(#source_ident)
346 }
347
348 fn e2o_connect(
349 p1: &Self::External,
350 p1_port: &<Self::External as Node>::Port,
351 p2: &Self::Process,
352 p2_port: &<Self::Process as Node>::Port,
353 _many: bool,
354 server_hint: NetworkHint,
355 ) -> Box<dyn FnOnce()> {
356 let p1 = p1.clone();
357 let p1_port = p1_port.clone();
358 let p2 = p2.clone();
359 let p2_port = p2_port.clone();
360
361 Box::new(move || {
362 let self_underlying_borrow = p1.underlying.borrow();
363 let self_underlying = self_underlying_borrow.as_ref().unwrap();
364 let source_port = self_underlying.declare_many_client();
365
366 let other_underlying_borrow = p2.underlying.borrow();
367 let other_underlying = other_underlying_borrow.as_ref().unwrap();
368 let recipient_port = other_underlying.get_port_with_hint(
369 p2_port.clone(),
370 match server_hint {
371 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
372 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
373 },
374 );
375
376 source_port.send_to(&recipient_port);
377
378 p1.client_ports
379 .borrow_mut()
380 .insert(p1_port.clone(), source_port);
381 })
382 }
383
384 fn o2e_sink(
385 _p1: &Self::Process,
386 _p1_port: &<Self::Process as Node>::Port,
387 _p2: &Self::External,
388 _p2_port: &<Self::External as Node>::Port,
389 shared_handle: String,
390 ) -> syn::Expr {
391 let sink_ident = syn::Ident::new(
392 &format!("__hydro_deploy_{}_sink", &shared_handle),
393 Span::call_site(),
394 );
395 parse_quote!(#sink_ident)
396 }
397
398 fn cluster_ids(
399 of_cluster: LocationKey,
400 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
401 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
402 }
403
404 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
405 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
406 }
407
408 fn cluster_membership_stream(
409 _env: &mut Self::InstantiateEnv,
410 _at_location: &LocationId,
411 location_id: &LocationId,
412 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
413 {
414 cluster_membership_stream(location_id)
415 }
416}
417
418#[expect(missing_docs, reason = "TODO")]
419pub trait DeployCrateWrapper {
420 fn underlying(&self) -> Arc<RustCrateService>;
421
422 fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
423 self.underlying().stdout()
424 }
425
426 fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
427 self.underlying().stderr()
428 }
429
430 fn stdout_filter(
431 &self,
432 prefix: impl Into<String>,
433 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
434 self.underlying().stdout_filter(prefix.into())
435 }
436
437 fn stderr_filter(
438 &self,
439 prefix: impl Into<String>,
440 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
441 self.underlying().stderr_filter(prefix.into())
442 }
443}
444
445#[expect(missing_docs, reason = "TODO")]
446#[derive(Clone)]
447pub struct TrybuildHost {
448 host: Arc<dyn Host>,
449 display_name: Option<String>,
450 rustflags: Option<String>,
451 profile: Option<String>,
452 additional_hydro_features: Vec<String>,
453 features: Vec<String>,
454 tracing: Option<TracingOptions>,
455 build_envs: Vec<(String, String)>,
456 name_hint: Option<String>,
457 cluster_idx: Option<usize>,
458}
459
460impl From<Arc<dyn Host>> for TrybuildHost {
461 fn from(host: Arc<dyn Host>) -> Self {
462 Self {
463 host,
464 display_name: None,
465 rustflags: None,
466 profile: None,
467 additional_hydro_features: vec![],
468 features: vec![],
469 tracing: None,
470 build_envs: vec![],
471 name_hint: None,
472 cluster_idx: None,
473 }
474 }
475}
476
477impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
478 fn from(host: Arc<H>) -> Self {
479 Self {
480 host,
481 display_name: None,
482 rustflags: None,
483 profile: None,
484 additional_hydro_features: vec![],
485 features: vec![],
486 tracing: None,
487 build_envs: vec![],
488 name_hint: None,
489 cluster_idx: None,
490 }
491 }
492}
493
494#[expect(missing_docs, reason = "TODO")]
495impl TrybuildHost {
496 pub fn new(host: Arc<dyn Host>) -> Self {
497 Self {
498 host,
499 display_name: None,
500 rustflags: None,
501 profile: None,
502 additional_hydro_features: vec![],
503 features: vec![],
504 tracing: None,
505 build_envs: vec![],
506 name_hint: None,
507 cluster_idx: None,
508 }
509 }
510
511 pub fn display_name(self, display_name: impl Into<String>) -> Self {
512 if self.display_name.is_some() {
513 panic!("{} already set", name_of!(display_name in Self));
514 }
515
516 Self {
517 display_name: Some(display_name.into()),
518 ..self
519 }
520 }
521
522 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
523 if self.rustflags.is_some() {
524 panic!("{} already set", name_of!(rustflags in Self));
525 }
526
527 Self {
528 rustflags: Some(rustflags.into()),
529 ..self
530 }
531 }
532
533 pub fn profile(self, profile: impl Into<String>) -> Self {
534 if self.profile.is_some() {
535 panic!("{} already set", name_of!(profile in Self));
536 }
537
538 Self {
539 profile: Some(profile.into()),
540 ..self
541 }
542 }
543
544 pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
545 Self {
546 additional_hydro_features,
547 ..self
548 }
549 }
550
551 pub fn features(self, features: Vec<String>) -> Self {
552 Self {
553 features: self.features.into_iter().chain(features).collect(),
554 ..self
555 }
556 }
557
558 pub fn tracing(self, tracing: TracingOptions) -> Self {
559 if self.tracing.is_some() {
560 panic!("{} already set", name_of!(tracing in Self));
561 }
562
563 Self {
564 tracing: Some(tracing),
565 ..self
566 }
567 }
568
569 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
570 Self {
571 build_envs: self
572 .build_envs
573 .into_iter()
574 .chain(std::iter::once((key.into(), value.into())))
575 .collect(),
576 ..self
577 }
578 }
579}
580
581impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
582 type ProcessSpec = TrybuildHost;
583 fn into_process_spec(self) -> TrybuildHost {
584 TrybuildHost {
585 host: self,
586 display_name: None,
587 rustflags: None,
588 profile: None,
589 additional_hydro_features: vec![],
590 features: vec![],
591 tracing: None,
592 build_envs: vec![],
593 name_hint: None,
594 cluster_idx: None,
595 }
596 }
597}
598
599impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
600 type ProcessSpec = TrybuildHost;
601 fn into_process_spec(self) -> TrybuildHost {
602 TrybuildHost {
603 host: self,
604 display_name: None,
605 rustflags: None,
606 profile: None,
607 additional_hydro_features: vec![],
608 features: vec![],
609 tracing: None,
610 build_envs: vec![],
611 name_hint: None,
612 cluster_idx: None,
613 }
614 }
615}
616
617#[expect(missing_docs, reason = "TODO")]
618#[derive(Clone)]
619pub struct DeployExternal {
620 next_port: Rc<RefCell<usize>>,
621 host: Arc<dyn Host>,
622 underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
623 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
624 allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
625}
626
627impl DeployExternal {
628 pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
629 self.client_ports
630 .borrow()
631 .get(
632 self.allocated_ports
633 .borrow()
634 .get(&external_port_id)
635 .unwrap(),
636 )
637 .unwrap()
638 .clone()
639 }
640}
641
642impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
643 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
644 assert!(
645 self.allocated_ports
646 .borrow_mut()
647 .insert(external_port_id, port.clone())
648 .is_none_or(|old| old == port)
649 );
650 }
651
652 fn as_bytes_bidi(
653 &self,
654 external_port_id: ExternalPortId,
655 ) -> impl Future<
656 Output = (
657 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
658 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
659 ),
660 > + 'a {
661 let port = self.raw_port(external_port_id);
662
663 async move {
664 let (source, sink) = port.connect().await.into_source_sink();
665 (
666 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
667 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
668 )
669 }
670 }
671
672 fn as_bincode_bidi<InT, OutT>(
673 &self,
674 external_port_id: ExternalPortId,
675 ) -> impl Future<
676 Output = (
677 Pin<Box<dyn Stream<Item = OutT>>>,
678 Pin<Box<dyn Sink<InT, Error = Error>>>,
679 ),
680 > + 'a
681 where
682 InT: Serialize + 'static,
683 OutT: DeserializeOwned + 'static,
684 {
685 let port = self.raw_port(external_port_id);
686 async move {
687 let (source, sink) = port.connect().await.into_source_sink();
688 (
689 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
690 as Pin<Box<dyn Stream<Item = OutT>>>,
691 Box::pin(
692 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
693 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
694 )
695 }
696 }
697
698 fn as_bincode_sink<T: Serialize + 'static>(
699 &self,
700 external_port_id: ExternalPortId,
701 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
702 let port = self.raw_port(external_port_id);
703 async move {
704 let sink = port.connect().await.into_sink();
705 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
706 as Pin<Box<dyn Sink<T, Error = Error>>>
707 }
708 }
709
710 fn as_bincode_source<T: DeserializeOwned + 'static>(
711 &self,
712 external_port_id: ExternalPortId,
713 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
714 let port = self.raw_port(external_port_id);
715 async move {
716 let source = port.connect().await.into_source();
717 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
718 as Pin<Box<dyn Stream<Item = T>>>
719 }
720 }
721}
722
723impl Node for DeployExternal {
724 type Port = String;
725 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
727 type InstantiateEnv = Deployment;
728
729 fn next_port(&self) -> Self::Port {
730 let next_port = *self.next_port.borrow();
731 *self.next_port.borrow_mut() += 1;
732
733 format!("port_{}", next_port)
734 }
735
736 fn instantiate(
737 &self,
738 env: &mut Self::InstantiateEnv,
739 _meta: &mut Self::Meta,
740 _graph: DfirGraph,
741 extra_stmts: &[syn::Stmt],
742 sidecars: &[syn::Expr],
743 ) {
744 assert!(extra_stmts.is_empty());
745 assert!(sidecars.is_empty());
746 let service = env.CustomService(self.host.clone(), vec![]);
747 *self.underlying.borrow_mut() = Some(service);
748 }
749
750 fn update_meta(&self, _meta: &Self::Meta) {}
751}
752
753impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
754 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
755 DeployExternal {
756 next_port: Rc::new(RefCell::new(0)),
757 host: self,
758 underlying: Rc::new(RefCell::new(None)),
759 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
760 client_ports: Rc::new(RefCell::new(HashMap::new())),
761 }
762 }
763}
764
765impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
766 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
767 DeployExternal {
768 next_port: Rc::new(RefCell::new(0)),
769 host: self,
770 underlying: Rc::new(RefCell::new(None)),
771 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
772 client_ports: Rc::new(RefCell::new(HashMap::new())),
773 }
774 }
775}
776
777pub(crate) enum CrateOrTrybuild {
778 Crate(RustCrate, Arc<dyn Host>),
779 Trybuild(TrybuildHost),
780}
781
782#[expect(missing_docs, reason = "TODO")]
783#[derive(Clone)]
784pub struct DeployNode {
785 next_port: Rc<RefCell<usize>>,
786 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
787 underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
788}
789
790impl DeployCrateWrapper for DeployNode {
791 fn underlying(&self) -> Arc<RustCrateService> {
792 Arc::clone(self.underlying.borrow().as_ref().unwrap())
793 }
794}
795
796impl Node for DeployNode {
797 type Port = String;
798 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
800 type InstantiateEnv = Deployment;
801
802 fn next_port(&self) -> String {
803 let next_port = *self.next_port.borrow();
804 *self.next_port.borrow_mut() += 1;
805
806 format!("port_{}", next_port)
807 }
808
809 fn update_meta(&self, meta: &Self::Meta) {
810 let underlying_node = self.underlying.borrow();
811 underlying_node.as_ref().unwrap().update_meta(HydroMeta {
812 clusters: meta.clone(),
813 cluster_id: None,
814 });
815 }
816
817 fn instantiate(
818 &self,
819 env: &mut Self::InstantiateEnv,
820 _meta: &mut Self::Meta,
821 graph: DfirGraph,
822 extra_stmts: &[syn::Stmt],
823 sidecars: &[syn::Expr],
824 ) {
825 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
826 CrateOrTrybuild::Crate(c, host) => (c, host),
827 CrateOrTrybuild::Trybuild(trybuild) => {
828 let linking_mode = if !cfg!(target_os = "windows")
830 && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
831 {
832 LinkingMode::Dynamic
835 } else {
836 LinkingMode::Static
837 };
838 let (bin_name, config) = create_graph_trybuild(
839 graph,
840 extra_stmts,
841 sidecars,
842 trybuild.name_hint.as_deref(),
843 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
844 linking_mode,
845 );
846 let host = trybuild.host.clone();
847 (
848 create_trybuild_service(
849 trybuild,
850 &config.project_dir,
851 &config.target_dir,
852 config.features.as_deref(),
853 &bin_name,
854 &config.linking_mode,
855 ),
856 host,
857 )
858 }
859 };
860
861 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
862 }
863}
864
865#[expect(missing_docs, reason = "TODO")]
866#[derive(Clone)]
867pub struct DeployClusterNode {
868 underlying: Arc<RustCrateService>,
869}
870
871impl DeployCrateWrapper for DeployClusterNode {
872 fn underlying(&self) -> Arc<RustCrateService> {
873 self.underlying.clone()
874 }
875}
876#[expect(missing_docs, reason = "TODO")]
877#[derive(Clone)]
878pub struct DeployCluster {
879 key: LocationKey,
880 next_port: Rc<RefCell<usize>>,
881 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
882 members: Rc<RefCell<Vec<DeployClusterNode>>>,
883 name_hint: Option<String>,
884}
885
886impl DeployCluster {
887 #[expect(missing_docs, reason = "TODO")]
888 pub fn members(&self) -> Vec<DeployClusterNode> {
889 self.members.borrow().clone()
890 }
891}
892
893impl Node for DeployCluster {
894 type Port = String;
895 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
897 type InstantiateEnv = Deployment;
898
899 fn next_port(&self) -> String {
900 let next_port = *self.next_port.borrow();
901 *self.next_port.borrow_mut() += 1;
902
903 format!("port_{}", next_port)
904 }
905
906 fn instantiate(
907 &self,
908 env: &mut Self::InstantiateEnv,
909 meta: &mut Self::Meta,
910 graph: DfirGraph,
911 extra_stmts: &[syn::Stmt],
912 sidecars: &[syn::Expr],
913 ) {
914 let has_trybuild = self
915 .cluster_spec
916 .borrow()
917 .as_ref()
918 .unwrap()
919 .iter()
920 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
921
922 let linking_mode = if !cfg!(target_os = "windows")
924 && self
925 .cluster_spec
926 .borrow()
927 .as_ref()
928 .unwrap()
929 .iter()
930 .all(|spec| match spec {
931 CrateOrTrybuild::Crate(_, _) => true, CrateOrTrybuild::Trybuild(t) => {
933 t.host.target_type() == hydro_deploy::HostTargetType::Local
934 }
935 }) {
936 LinkingMode::Dynamic
938 } else {
939 LinkingMode::Static
940 };
941
942 let maybe_trybuild = if has_trybuild {
943 Some(create_graph_trybuild(
944 graph,
945 extra_stmts,
946 sidecars,
947 self.name_hint.as_deref(),
948 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
949 linking_mode,
950 ))
951 } else {
952 None
953 };
954
955 let cluster_nodes = self
956 .cluster_spec
957 .borrow_mut()
958 .take()
959 .unwrap()
960 .into_iter()
961 .map(|spec| {
962 let (service, host) = match spec {
963 CrateOrTrybuild::Crate(c, host) => (c, host),
964 CrateOrTrybuild::Trybuild(trybuild) => {
965 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
966 let host = trybuild.host.clone();
967 (
968 create_trybuild_service(
969 trybuild,
970 &config.project_dir,
971 &config.target_dir,
972 config.features.as_deref(),
973 bin_name,
974 &config.linking_mode,
975 ),
976 host,
977 )
978 }
979 };
980
981 env.add_service(service, host)
982 })
983 .collect::<Vec<_>>();
984 meta.insert(
985 self.key,
986 (0..(cluster_nodes.len() as u32))
987 .map(TaglessMemberId::from_raw_id)
988 .collect(),
989 );
990 *self.members.borrow_mut() = cluster_nodes
991 .into_iter()
992 .map(|n| DeployClusterNode { underlying: n })
993 .collect();
994 }
995
996 fn update_meta(&self, meta: &Self::Meta) {
997 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
998 node.underlying.update_meta(HydroMeta {
999 clusters: meta.clone(),
1000 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
1001 });
1002 }
1003 }
1004}
1005
1006#[expect(missing_docs, reason = "TODO")]
1007#[derive(Clone)]
1008pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
1009
1010impl DeployProcessSpec {
1011 #[expect(missing_docs, reason = "TODO")]
1012 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1013 Self(t, host)
1014 }
1015}
1016
1017impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1018 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1019 DeployNode {
1020 next_port: Rc::new(RefCell::new(0)),
1021 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1022 underlying: Rc::new(RefCell::new(None)),
1023 }
1024 }
1025}
1026
1027impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1028 fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1029 self.name_hint = Some(format!("{} (process {})", name_hint, key));
1030 DeployNode {
1031 next_port: Rc::new(RefCell::new(0)),
1032 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1033 underlying: Rc::new(RefCell::new(None)),
1034 }
1035 }
1036}
1037
1038#[expect(missing_docs, reason = "TODO")]
1039#[derive(Clone)]
1040pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1041
1042impl DeployClusterSpec {
1043 #[expect(missing_docs, reason = "TODO")]
1044 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1045 Self(crates)
1046 }
1047}
1048
1049impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1050 fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1051 DeployCluster {
1052 key,
1053 next_port: Rc::new(RefCell::new(0)),
1054 cluster_spec: Rc::new(RefCell::new(Some(
1055 self.0
1056 .into_iter()
1057 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1058 .collect(),
1059 ))),
1060 members: Rc::new(RefCell::new(vec![])),
1061 name_hint: None,
1062 }
1063 }
1064}
1065
1066impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1067 fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1068 let name_hint = format!("{} (cluster {})", name_hint, key);
1069 DeployCluster {
1070 key,
1071 next_port: Rc::new(RefCell::new(0)),
1072 cluster_spec: Rc::new(RefCell::new(Some(
1073 self.into_iter()
1074 .enumerate()
1075 .map(|(idx, b)| {
1076 let mut b = b.into();
1077 b.name_hint = Some(name_hint.clone());
1078 b.cluster_idx = Some(idx);
1079 CrateOrTrybuild::Trybuild(b)
1080 })
1081 .collect(),
1082 ))),
1083 members: Rc::new(RefCell::new(vec![])),
1084 name_hint: Some(name_hint),
1085 }
1086 }
1087}
1088
1089fn create_trybuild_service(
1090 trybuild: TrybuildHost,
1091 dir: &std::path::Path,
1092 target_dir: &std::path::PathBuf,
1093 features: Option<&[String]>,
1094 bin_name: &str,
1095 linking_mode: &LinkingMode,
1096) -> RustCrate {
1097 let crate_dir = match linking_mode {
1099 LinkingMode::Dynamic => dir.join("dylib-examples"),
1100 LinkingMode::Static => dir.to_path_buf(),
1101 };
1102
1103 let mut ret = RustCrate::new(&crate_dir, dir)
1104 .target_dir(target_dir)
1105 .example(bin_name)
1106 .no_default_features();
1107
1108 ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1109
1110 if let Some(display_name) = trybuild.display_name {
1111 ret = ret.display_name(display_name);
1112 } else if let Some(name_hint) = trybuild.name_hint {
1113 if let Some(cluster_idx) = trybuild.cluster_idx {
1114 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1115 } else {
1116 ret = ret.display_name(name_hint);
1117 }
1118 }
1119
1120 if let Some(rustflags) = trybuild.rustflags {
1121 ret = ret.rustflags(rustflags);
1122 }
1123
1124 if let Some(profile) = trybuild.profile {
1125 ret = ret.profile(profile);
1126 }
1127
1128 if let Some(tracing) = trybuild.tracing {
1129 ret = ret.tracing(tracing);
1130 }
1131
1132 ret = ret.features(
1133 vec!["hydro___feature_deploy_integration".to_owned()]
1134 .into_iter()
1135 .chain(
1136 trybuild
1137 .additional_hydro_features
1138 .into_iter()
1139 .map(|runtime_feature| {
1140 assert!(
1141 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1142 "{runtime_feature} is not a valid Hydro runtime feature"
1143 );
1144 format!("hydro___feature_{runtime_feature}")
1145 }),
1146 )
1147 .chain(trybuild.features),
1148 );
1149
1150 for (key, value) in trybuild.build_envs {
1151 ret = ret.build_env(key, value);
1152 }
1153
1154 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1155 ret = ret.config("build.incremental = false");
1156
1157 if let Some(features) = features {
1158 ret = ret.features(features);
1159 }
1160
1161 ret
1162}