1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56#[expect(missing_docs, reason = "TODO")]
57pub mod external_process;
58pub use external_process::External;
59
60#[expect(missing_docs, reason = "TODO")]
61pub mod process;
62pub use process::Process;
63
64#[expect(missing_docs, reason = "TODO")]
65pub mod cluster;
66pub use cluster::Cluster;
67
68#[expect(missing_docs, reason = "TODO")]
69pub mod member_id;
70pub use member_id::{MemberId, TaglessMemberId};
71
72#[expect(missing_docs, reason = "TODO")]
73pub mod tick;
74pub use tick::{Atomic, NoTick, Tick};
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
78pub enum MembershipEvent {
79 Joined,
80 Left,
81}
82
83#[expect(missing_docs, reason = "TODO")]
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
85pub enum NetworkHint {
86 Auto,
87 TcpPort(Option<u16>),
88}
89
90pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
91 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
92}
93
94#[stageleft::export(LocationKey)]
95new_key_type! {
96 pub struct LocationKey;
98}
99
100impl std::fmt::Display for LocationKey {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(f, "loc{:?}", self.data()) }
104}
105
106impl std::str::FromStr for LocationKey {
109 type Err = Option<ParseIntError>;
110
111 fn from_str(s: &str) -> Result<Self, Self::Err> {
112 let nvn = s.strip_prefix("loc").ok_or(None)?;
113 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
114 let idx: u64 = idx.parse()?;
115 let ver: u64 = ver.parse()?;
116 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
117 }
118}
119
120impl LocationKey {
121 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
127 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
131 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
133
134impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
136 type O = LocationKey;
137
138 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
139 where
140 Self: Sized,
141 {
142 let root = get_this_crate();
143 let n = Key::data(&self).as_ffi();
144 (
145 QuoteTokens {
146 prelude: None,
147 expr: Some(quote! {
148 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
149 }),
150 },
151 (),
152 )
153 }
154}
155
156#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
158pub enum LocationType {
159 Process,
161 Cluster,
163 External,
165}
166
167#[expect(
181 private_bounds,
182 reason = "only internal Hydro code can define location types"
183)]
184pub trait Location<'a>: dynamic::DynLocation {
185 type Root: Location<'a>;
190
191 fn root(&self) -> Self::Root;
196
197 fn try_tick(&self) -> Option<Tick<Self>> {
204 if Self::is_top_level() {
205 let id = self.flow_state().borrow_mut().next_clock_id();
206 Some(Tick {
207 id,
208 l: self.clone(),
209 })
210 } else {
211 None
212 }
213 }
214
215 fn id(&self) -> LocationId {
217 dynamic::DynLocation::id(self)
218 }
219
220 fn tick(&self) -> Tick<Self>
246 where
247 Self: NoTick,
248 {
249 let id = self.flow_state().borrow_mut().next_clock_id();
250 Tick {
251 id,
252 l: self.clone(),
253 }
254 }
255
256 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
281 where
282 Self: Sized + NoTick,
283 {
284 Stream::new(
285 self.clone(),
286 HydroNode::Source {
287 source: HydroSource::Spin(),
288 metadata: self.new_node_metadata(Stream::<
289 (),
290 Self,
291 Unbounded,
292 TotalOrder,
293 ExactlyOnce,
294 >::collection_kind()),
295 },
296 )
297 }
298
299 fn source_stream<T, E>(
320 &self,
321 e: impl QuotedWithContext<'a, E, Self>,
322 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
323 where
324 E: FuturesStream<Item = T> + Unpin,
325 Self: Sized + NoTick,
326 {
327 let e = e.splice_untyped_ctx(self);
328
329 Stream::new(
330 self.clone(),
331 HydroNode::Source {
332 source: HydroSource::Stream(e.into()),
333 metadata: self.new_node_metadata(Stream::<
334 T,
335 Self,
336 Unbounded,
337 TotalOrder,
338 ExactlyOnce,
339 >::collection_kind()),
340 },
341 )
342 }
343
344 fn source_iter<T, E>(
366 &self,
367 e: impl QuotedWithContext<'a, E, Self>,
368 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
369 where
370 E: IntoIterator<Item = T>,
371 Self: Sized + NoTick,
372 {
373 let e = e.splice_typed_ctx(self);
374
375 Stream::new(
376 self.clone(),
377 HydroNode::Source {
378 source: HydroSource::Iter(e.into()),
379 metadata: self.new_node_metadata(
380 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
381 ),
382 },
383 )
384 }
385
386 fn source_cluster_members<C: 'a>(
420 &self,
421 cluster: &Cluster<'a, C>,
422 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
423 where
424 Self: Sized + NoTick,
425 {
426 Stream::new(
427 self.clone(),
428 HydroNode::Source {
429 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
430 metadata: self.new_node_metadata(Stream::<
431 (TaglessMemberId, MembershipEvent),
432 Self,
433 Unbounded,
434 TotalOrder,
435 ExactlyOnce,
436 >::collection_kind()),
437 },
438 )
439 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
440 .into_keyed()
441 }
442
443 fn source_external_bytes<L>(
451 &self,
452 from: &External<L>,
453 ) -> (
454 ExternalBytesPort,
455 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
456 )
457 where
458 Self: Sized + NoTick,
459 {
460 let (port, stream, sink) =
461 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
462
463 sink.complete(self.source_iter(q!([])));
464
465 (port, stream)
466 }
467
468 #[expect(clippy::type_complexity, reason = "stream markers")]
475 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
476 &self,
477 from: &External<L>,
478 ) -> (
479 ExternalBincodeSink<T, NotMany, O, R>,
480 Stream<T, Self, Unbounded, O, R>,
481 )
482 where
483 Self: Sized + NoTick,
484 T: Serialize + DeserializeOwned,
485 {
486 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
487 sink.complete(self.source_iter(q!([])));
488
489 (
490 ExternalBincodeSink {
491 process_key: from.key,
492 port_id: port.port_id,
493 _phantom: PhantomData,
494 },
495 stream.weaken_ordering().weaken_retries(),
496 )
497 }
498
499 #[cfg(feature = "sim")]
504 #[expect(clippy::type_complexity, reason = "stream markers")]
505 fn sim_input<T, O: Ordering, R: Retries>(
506 &self,
507 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
508 where
509 Self: Sized + NoTick,
510 T: Serialize + DeserializeOwned,
511 {
512 let external_location: External<'a, ()> = External {
513 key: LocationKey::FIRST,
514 flow_state: self.flow_state().clone(),
515 _phantom: PhantomData,
516 };
517
518 let (external, stream) = self.source_external_bincode(&external_location);
519
520 (SimSender(external.port_id, PhantomData), stream)
521 }
522
523 fn embedded_input<T>(
529 &self,
530 name: impl Into<String>,
531 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
532 where
533 Self: Sized + NoTick,
534 {
535 let ident = syn::Ident::new(&name.into(), Span::call_site());
536
537 Stream::new(
538 self.clone(),
539 HydroNode::Source {
540 source: HydroSource::Embedded(ident),
541 metadata: self.new_node_metadata(Stream::<
542 T,
543 Self,
544 Unbounded,
545 TotalOrder,
546 ExactlyOnce,
547 >::collection_kind()),
548 },
549 )
550 }
551
552 #[expect(clippy::type_complexity, reason = "stream markers")]
597 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
598 &self,
599 from: &External<L>,
600 port_hint: NetworkHint,
601 ) -> (
602 ExternalBytesPort<NotMany>,
603 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
604 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
605 )
606 where
607 Self: Sized + NoTick,
608 {
609 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
610
611 let (fwd_ref, to_sink) =
612 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
613 let mut flow_state_borrow = self.flow_state().borrow_mut();
614
615 flow_state_borrow.push_root(HydroRoot::SendExternal {
616 to_external_key: from.key,
617 to_port_id: next_external_port_id,
618 to_many: false,
619 unpaired: false,
620 serialize_fn: None,
621 instantiate_fn: DebugInstantiate::Building,
622 input: Box::new(to_sink.ir_node.into_inner()),
623 op_metadata: HydroIrOpMetadata::new(),
624 });
625
626 let raw_stream: Stream<
627 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
628 Self,
629 Unbounded,
630 TotalOrder,
631 ExactlyOnce,
632 > = Stream::new(
633 self.clone(),
634 HydroNode::ExternalInput {
635 from_external_key: from.key,
636 from_port_id: next_external_port_id,
637 from_many: false,
638 codec_type: quote_type::<Codec>().into(),
639 port_hint,
640 instantiate_fn: DebugInstantiate::Building,
641 deserialize_fn: None,
642 metadata: self.new_node_metadata(Stream::<
643 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
644 Self,
645 Unbounded,
646 TotalOrder,
647 ExactlyOnce,
648 >::collection_kind()),
649 },
650 );
651
652 (
653 ExternalBytesPort {
654 process_key: from.key,
655 port_id: next_external_port_id,
656 _phantom: PhantomData,
657 },
658 raw_stream.flatten_ordered(),
659 fwd_ref,
660 )
661 }
662
663 #[expect(clippy::type_complexity, reason = "stream markers")]
673 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
674 &self,
675 from: &External<L>,
676 ) -> (
677 ExternalBincodeBidi<InT, OutT, NotMany>,
678 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
679 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
680 )
681 where
682 Self: Sized + NoTick,
683 {
684 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
685
686 let (fwd_ref, to_sink) =
687 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
688 let mut flow_state_borrow = self.flow_state().borrow_mut();
689
690 let root = get_this_crate();
691
692 let out_t_type = quote_type::<OutT>();
693 let ser_fn: syn::Expr = syn::parse_quote! {
694 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
695 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
696 )
697 };
698
699 flow_state_borrow.push_root(HydroRoot::SendExternal {
700 to_external_key: from.key,
701 to_port_id: next_external_port_id,
702 to_many: false,
703 unpaired: false,
704 serialize_fn: Some(ser_fn.into()),
705 instantiate_fn: DebugInstantiate::Building,
706 input: Box::new(to_sink.ir_node.into_inner()),
707 op_metadata: HydroIrOpMetadata::new(),
708 });
709
710 let in_t_type = quote_type::<InT>();
711
712 let deser_fn: syn::Expr = syn::parse_quote! {
713 |res| {
714 let b = res.unwrap();
715 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
716 }
717 };
718
719 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
720 self.clone(),
721 HydroNode::ExternalInput {
722 from_external_key: from.key,
723 from_port_id: next_external_port_id,
724 from_many: false,
725 codec_type: quote_type::<LengthDelimitedCodec>().into(),
726 port_hint: NetworkHint::Auto,
727 instantiate_fn: DebugInstantiate::Building,
728 deserialize_fn: Some(deser_fn.into()),
729 metadata: self.new_node_metadata(Stream::<
730 InT,
731 Self,
732 Unbounded,
733 TotalOrder,
734 ExactlyOnce,
735 >::collection_kind()),
736 },
737 );
738
739 (
740 ExternalBincodeBidi {
741 process_key: from.key,
742 port_id: next_external_port_id,
743 _phantom: PhantomData,
744 },
745 raw_stream,
746 fwd_ref,
747 )
748 }
749
750 #[expect(clippy::type_complexity, reason = "stream markers")]
762 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
763 &self,
764 from: &External<L>,
765 port_hint: NetworkHint,
766 ) -> (
767 ExternalBytesPort<Many>,
768 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
769 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
770 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
771 )
772 where
773 Self: Sized + NoTick,
774 {
775 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
776
777 let (fwd_ref, to_sink) =
778 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
779 let mut flow_state_borrow = self.flow_state().borrow_mut();
780
781 flow_state_borrow.push_root(HydroRoot::SendExternal {
782 to_external_key: from.key,
783 to_port_id: next_external_port_id,
784 to_many: true,
785 unpaired: false,
786 serialize_fn: None,
787 instantiate_fn: DebugInstantiate::Building,
788 input: Box::new(to_sink.entries().ir_node.into_inner()),
789 op_metadata: HydroIrOpMetadata::new(),
790 });
791
792 let raw_stream: Stream<
793 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
794 Self,
795 Unbounded,
796 TotalOrder,
797 ExactlyOnce,
798 > = Stream::new(
799 self.clone(),
800 HydroNode::ExternalInput {
801 from_external_key: from.key,
802 from_port_id: next_external_port_id,
803 from_many: true,
804 codec_type: quote_type::<Codec>().into(),
805 port_hint,
806 instantiate_fn: DebugInstantiate::Building,
807 deserialize_fn: None,
808 metadata: self.new_node_metadata(Stream::<
809 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
810 Self,
811 Unbounded,
812 TotalOrder,
813 ExactlyOnce,
814 >::collection_kind()),
815 },
816 );
817
818 let membership_stream_ident = syn::Ident::new(
819 &format!(
820 "__hydro_deploy_many_{}_{}_membership",
821 from.key, next_external_port_id
822 ),
823 Span::call_site(),
824 );
825 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
826 let raw_membership_stream: KeyedStream<
827 u64,
828 bool,
829 Self,
830 Unbounded,
831 TotalOrder,
832 ExactlyOnce,
833 > = KeyedStream::new(
834 self.clone(),
835 HydroNode::Source {
836 source: HydroSource::Stream(membership_stream_expr.into()),
837 metadata: self.new_node_metadata(KeyedStream::<
838 u64,
839 bool,
840 Self,
841 Unbounded,
842 TotalOrder,
843 ExactlyOnce,
844 >::collection_kind()),
845 },
846 );
847
848 (
849 ExternalBytesPort {
850 process_key: from.key,
851 port_id: next_external_port_id,
852 _phantom: PhantomData,
853 },
854 raw_stream
855 .flatten_ordered() .into_keyed(),
857 raw_membership_stream.map(q!(|join| {
858 if join {
859 MembershipEvent::Joined
860 } else {
861 MembershipEvent::Left
862 }
863 })),
864 fwd_ref,
865 )
866 }
867
868 #[expect(clippy::type_complexity, reason = "stream markers")]
884 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
885 &self,
886 from: &External<L>,
887 ) -> (
888 ExternalBincodeBidi<InT, OutT, Many>,
889 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
890 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
891 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
892 )
893 where
894 Self: Sized + NoTick,
895 {
896 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
897
898 let (fwd_ref, to_sink) =
899 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
900 let mut flow_state_borrow = self.flow_state().borrow_mut();
901
902 let root = get_this_crate();
903
904 let out_t_type = quote_type::<OutT>();
905 let ser_fn: syn::Expr = syn::parse_quote! {
906 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
907 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
908 )
909 };
910
911 flow_state_borrow.push_root(HydroRoot::SendExternal {
912 to_external_key: from.key,
913 to_port_id: next_external_port_id,
914 to_many: true,
915 unpaired: false,
916 serialize_fn: Some(ser_fn.into()),
917 instantiate_fn: DebugInstantiate::Building,
918 input: Box::new(to_sink.entries().ir_node.into_inner()),
919 op_metadata: HydroIrOpMetadata::new(),
920 });
921
922 let in_t_type = quote_type::<InT>();
923
924 let deser_fn: syn::Expr = syn::parse_quote! {
925 |res| {
926 let (id, b) = res.unwrap();
927 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
928 }
929 };
930
931 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
932 KeyedStream::new(
933 self.clone(),
934 HydroNode::ExternalInput {
935 from_external_key: from.key,
936 from_port_id: next_external_port_id,
937 from_many: true,
938 codec_type: quote_type::<LengthDelimitedCodec>().into(),
939 port_hint: NetworkHint::Auto,
940 instantiate_fn: DebugInstantiate::Building,
941 deserialize_fn: Some(deser_fn.into()),
942 metadata: self.new_node_metadata(KeyedStream::<
943 u64,
944 InT,
945 Self,
946 Unbounded,
947 TotalOrder,
948 ExactlyOnce,
949 >::collection_kind()),
950 },
951 );
952
953 let membership_stream_ident = syn::Ident::new(
954 &format!(
955 "__hydro_deploy_many_{}_{}_membership",
956 from.key, next_external_port_id
957 ),
958 Span::call_site(),
959 );
960 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
961 let raw_membership_stream: KeyedStream<
962 u64,
963 bool,
964 Self,
965 Unbounded,
966 TotalOrder,
967 ExactlyOnce,
968 > = KeyedStream::new(
969 self.clone(),
970 HydroNode::Source {
971 source: HydroSource::Stream(membership_stream_expr.into()),
972 metadata: self.new_node_metadata(KeyedStream::<
973 u64,
974 bool,
975 Self,
976 Unbounded,
977 TotalOrder,
978 ExactlyOnce,
979 >::collection_kind()),
980 },
981 );
982
983 (
984 ExternalBincodeBidi {
985 process_key: from.key,
986 port_id: next_external_port_id,
987 _phantom: PhantomData,
988 },
989 raw_stream,
990 raw_membership_stream.map(q!(|join| {
991 if join {
992 MembershipEvent::Joined
993 } else {
994 MembershipEvent::Left
995 }
996 })),
997 fwd_ref,
998 )
999 }
1000
1001 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1019 where
1020 T: Clone,
1021 Self: Sized,
1022 {
1023 let e = e.splice_untyped_ctx(self);
1024
1025 Singleton::new(
1026 self.clone(),
1027 HydroNode::SingletonSource {
1028 value: e.into(),
1029 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1030 },
1031 )
1032 }
1033
1034 fn source_interval(
1044 &self,
1045 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1046 _nondet: NonDet,
1047 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1048 where
1049 Self: Sized + NoTick,
1050 {
1051 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1052 tokio::time::interval(interval)
1053 )))
1054 }
1055
1056 fn source_interval_delayed(
1067 &self,
1068 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1069 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1070 _nondet: NonDet,
1071 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1072 where
1073 Self: Sized + NoTick,
1074 {
1075 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1076 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1077 )))
1078 }
1079
1080 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1114 where
1115 S: CycleCollection<'a, ForwardRef, Location = Self>,
1116 {
1117 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1118 (
1119 ForwardHandle::new(cycle_id, Location::id(self)),
1120 S::create_source(cycle_id, self.clone()),
1121 )
1122 }
1123}
1124
1125#[cfg(feature = "deploy")]
1126#[cfg(test)]
1127mod tests {
1128 use std::collections::HashSet;
1129
1130 use futures::{SinkExt, StreamExt};
1131 use hydro_deploy::Deployment;
1132 use stageleft::q;
1133 use tokio_util::codec::LengthDelimitedCodec;
1134
1135 use crate::compile::builder::FlowBuilder;
1136 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1137 use crate::location::{Location, NetworkHint};
1138 use crate::nondet::nondet;
1139
1140 #[tokio::test]
1141 async fn top_level_singleton_replay_cardinality() {
1142 let mut deployment = Deployment::new();
1143
1144 let mut flow = FlowBuilder::new();
1145 let node = flow.process::<()>();
1146 let external = flow.external::<()>();
1147
1148 let (in_port, input) =
1149 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1150 let singleton = node.singleton(q!(123));
1151 let tick = node.tick();
1152 let out = input
1153 .batch(&tick, nondet!())
1154 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1155 .cross_singleton(
1156 singleton
1157 .snapshot(&tick, nondet!())
1158 .into_stream()
1159 .count(),
1160 )
1161 .all_ticks()
1162 .send_bincode_external(&external);
1163
1164 let nodes = flow
1165 .with_process(&node, deployment.Localhost())
1166 .with_external(&external, deployment.Localhost())
1167 .deploy(&mut deployment);
1168
1169 deployment.deploy().await.unwrap();
1170
1171 let mut external_in = nodes.connect(in_port).await;
1172 let mut external_out = nodes.connect(out).await;
1173
1174 deployment.start().await.unwrap();
1175
1176 external_in.send(1).await.unwrap();
1177 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1178
1179 external_in.send(2).await.unwrap();
1180 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1181 }
1182
1183 #[tokio::test]
1184 async fn tick_singleton_replay_cardinality() {
1185 let mut deployment = Deployment::new();
1186
1187 let mut flow = FlowBuilder::new();
1188 let node = flow.process::<()>();
1189 let external = flow.external::<()>();
1190
1191 let (in_port, input) =
1192 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1193 let tick = node.tick();
1194 let singleton = tick.singleton(q!(123));
1195 let out = input
1196 .batch(&tick, nondet!())
1197 .cross_singleton(singleton.clone())
1198 .cross_singleton(singleton.into_stream().count())
1199 .all_ticks()
1200 .send_bincode_external(&external);
1201
1202 let nodes = flow
1203 .with_process(&node, deployment.Localhost())
1204 .with_external(&external, deployment.Localhost())
1205 .deploy(&mut deployment);
1206
1207 deployment.deploy().await.unwrap();
1208
1209 let mut external_in = nodes.connect(in_port).await;
1210 let mut external_out = nodes.connect(out).await;
1211
1212 deployment.start().await.unwrap();
1213
1214 external_in.send(1).await.unwrap();
1215 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1216
1217 external_in.send(2).await.unwrap();
1218 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1219 }
1220
1221 #[tokio::test]
1222 async fn external_bytes() {
1223 let mut deployment = Deployment::new();
1224
1225 let mut flow = FlowBuilder::new();
1226 let first_node = flow.process::<()>();
1227 let external = flow.external::<()>();
1228
1229 let (in_port, input) = first_node.source_external_bytes(&external);
1230 let out = input.send_bincode_external(&external);
1231
1232 let nodes = flow
1233 .with_process(&first_node, deployment.Localhost())
1234 .with_external(&external, deployment.Localhost())
1235 .deploy(&mut deployment);
1236
1237 deployment.deploy().await.unwrap();
1238
1239 let mut external_in = nodes.connect(in_port).await.1;
1240 let mut external_out = nodes.connect(out).await;
1241
1242 deployment.start().await.unwrap();
1243
1244 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1245
1246 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1247 }
1248
1249 #[tokio::test]
1250 async fn multi_external_source() {
1251 let mut deployment = Deployment::new();
1252
1253 let mut flow = FlowBuilder::new();
1254 let first_node = flow.process::<()>();
1255 let external = flow.external::<()>();
1256
1257 let (in_port, input, _membership, complete_sink) =
1258 first_node.bidi_external_many_bincode(&external);
1259 let out = input.entries().send_bincode_external(&external);
1260 complete_sink.complete(
1261 first_node
1262 .source_iter::<(u64, ()), _>(q!([]))
1263 .into_keyed()
1264 .weaken_ordering(),
1265 );
1266
1267 let nodes = flow
1268 .with_process(&first_node, deployment.Localhost())
1269 .with_external(&external, deployment.Localhost())
1270 .deploy(&mut deployment);
1271
1272 deployment.deploy().await.unwrap();
1273
1274 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1275 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1276 let external_out = nodes.connect(out).await;
1277
1278 deployment.start().await.unwrap();
1279
1280 external_in_1.send(123).await.unwrap();
1281 external_in_2.send(456).await.unwrap();
1282
1283 assert_eq!(
1284 external_out.take(2).collect::<HashSet<_>>().await,
1285 vec![(0, 123), (1, 456)].into_iter().collect()
1286 );
1287 }
1288
1289 #[tokio::test]
1290 async fn second_connection_only_multi_source() {
1291 let mut deployment = Deployment::new();
1292
1293 let mut flow = FlowBuilder::new();
1294 let first_node = flow.process::<()>();
1295 let external = flow.external::<()>();
1296
1297 let (in_port, input, _membership, complete_sink) =
1298 first_node.bidi_external_many_bincode(&external);
1299 let out = input.entries().send_bincode_external(&external);
1300 complete_sink.complete(
1301 first_node
1302 .source_iter::<(u64, ()), _>(q!([]))
1303 .into_keyed()
1304 .weaken_ordering(),
1305 );
1306
1307 let nodes = flow
1308 .with_process(&first_node, deployment.Localhost())
1309 .with_external(&external, deployment.Localhost())
1310 .deploy(&mut deployment);
1311
1312 deployment.deploy().await.unwrap();
1313
1314 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1316 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1317 let mut external_out = nodes.connect(out).await;
1318
1319 deployment.start().await.unwrap();
1320
1321 external_in_2.send(456).await.unwrap();
1322
1323 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1324 }
1325
1326 #[tokio::test]
1327 async fn multi_external_bytes() {
1328 let mut deployment = Deployment::new();
1329
1330 let mut flow = FlowBuilder::new();
1331 let first_node = flow.process::<()>();
1332 let external = flow.external::<()>();
1333
1334 let (in_port, input, _membership, complete_sink) = first_node
1335 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1336 let out = input.entries().send_bincode_external(&external);
1337 complete_sink.complete(
1338 first_node
1339 .source_iter(q!([]))
1340 .into_keyed()
1341 .weaken_ordering(),
1342 );
1343
1344 let nodes = flow
1345 .with_process(&first_node, deployment.Localhost())
1346 .with_external(&external, deployment.Localhost())
1347 .deploy(&mut deployment);
1348
1349 deployment.deploy().await.unwrap();
1350
1351 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1352 let mut external_in_2 = nodes.connect(in_port).await.1;
1353 let external_out = nodes.connect(out).await;
1354
1355 deployment.start().await.unwrap();
1356
1357 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1358 external_in_2.send(vec![4, 5].into()).await.unwrap();
1359
1360 assert_eq!(
1361 external_out.take(2).collect::<HashSet<_>>().await,
1362 vec![
1363 (0, (&[1u8, 2, 3] as &[u8]).into()),
1364 (1, (&[4u8, 5] as &[u8]).into())
1365 ]
1366 .into_iter()
1367 .collect()
1368 );
1369 }
1370
1371 #[tokio::test]
1372 async fn single_client_external_bytes() {
1373 let mut deployment = Deployment::new();
1374 let mut flow = FlowBuilder::new();
1375 let first_node = flow.process::<()>();
1376 let external = flow.external::<()>();
1377 let (port, input, complete_sink) = first_node
1378 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1379 complete_sink.complete(input.map(q!(|data| {
1380 let mut resp: Vec<u8> = data.into();
1381 resp.push(42);
1382 resp.into() })));
1384
1385 let nodes = flow
1386 .with_process(&first_node, deployment.Localhost())
1387 .with_external(&external, deployment.Localhost())
1388 .deploy(&mut deployment);
1389
1390 deployment.deploy().await.unwrap();
1391 deployment.start().await.unwrap();
1392
1393 let (mut external_out, mut external_in) = nodes.connect(port).await;
1394
1395 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1396 assert_eq!(
1397 external_out.next().await.unwrap().unwrap(),
1398 vec![1, 2, 3, 42]
1399 );
1400 }
1401
1402 #[tokio::test]
1403 async fn echo_external_bytes() {
1404 let mut deployment = Deployment::new();
1405
1406 let mut flow = FlowBuilder::new();
1407 let first_node = flow.process::<()>();
1408 let external = flow.external::<()>();
1409
1410 let (port, input, _membership, complete_sink) = first_node
1411 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1412 complete_sink
1413 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1414
1415 let nodes = flow
1416 .with_process(&first_node, deployment.Localhost())
1417 .with_external(&external, deployment.Localhost())
1418 .deploy(&mut deployment);
1419
1420 deployment.deploy().await.unwrap();
1421
1422 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1423 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1424
1425 deployment.start().await.unwrap();
1426
1427 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1428 external_in_2.send(vec![4, 5].into()).await.unwrap();
1429
1430 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1431 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1432 }
1433
1434 #[tokio::test]
1435 async fn echo_external_bincode() {
1436 let mut deployment = Deployment::new();
1437
1438 let mut flow = FlowBuilder::new();
1439 let first_node = flow.process::<()>();
1440 let external = flow.external::<()>();
1441
1442 let (port, input, _membership, complete_sink) =
1443 first_node.bidi_external_many_bincode(&external);
1444 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1445
1446 let nodes = flow
1447 .with_process(&first_node, deployment.Localhost())
1448 .with_external(&external, deployment.Localhost())
1449 .deploy(&mut deployment);
1450
1451 deployment.deploy().await.unwrap();
1452
1453 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1454 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1455
1456 deployment.start().await.unwrap();
1457
1458 external_in_1.send("hi".to_owned()).await.unwrap();
1459 external_in_2.send("hello".to_owned()).await.unwrap();
1460
1461 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1462 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1463 }
1464}