Skip to main content

hydro_lang/
forward_handle.rs

1//! Mechanisms for introducing forward references and cycles in Hydro.
2
3use sealed::sealed;
4
5use crate::compile::builder::CycleId;
6use crate::location::Location;
7use crate::location::dynamic::LocationId;
8use crate::staging_util::Invariant;
9
10#[sealed]
11pub(crate) trait ReceiverKind {}
12
13/// Marks that the [`ForwardHandle`] is for a "forward reference" to a later-defined collection.
14///
15/// When the handle is completed, the provided collection must not depend _synchronously_
16/// (in the same tick) on the forward reference that was created earlier.
17pub enum ForwardRef {}
18
19#[sealed]
20impl ReceiverKind for ForwardRef {}
21
22/// Marks that the [`ForwardHandle`] will send a live collection to the next tick.
23///
24/// Dependency cycles are permitted for this handle type, because the collection used
25/// to complete this handle will appear on the source-side on the _next_ tick.
26pub enum TickCycle {}
27
28#[sealed]
29impl ReceiverKind for TickCycle {}
30
31pub(crate) trait ReceiverComplete<'a, Marker>
32where
33    Marker: ReceiverKind,
34{
35    fn complete(self, cycle_id: CycleId, expected_location: LocationId);
36}
37
38pub(crate) trait CycleCollection<'a, Kind>: ReceiverComplete<'a, Kind>
39where
40    Kind: ReceiverKind,
41{
42    type Location: Location<'a>;
43
44    fn create_source(id: CycleId, location: Self::Location) -> Self;
45}
46
47pub(crate) trait CycleCollectionWithInitial<'a, Kind>: ReceiverComplete<'a, Kind>
48where
49    Kind: ReceiverKind,
50{
51    type Location: Location<'a>;
52
53    fn create_source_with_initial(
54        cycle_id: CycleId,
55        initial: Self,
56        location: Self::Location,
57    ) -> Self;
58}
59
60/// A handle that can be used to fulfill a forward reference.
61///
62/// The `C` type parameter specifies the collection type that can be used to complete the handle.
63#[expect(
64    private_bounds,
65    reason = "only Hydro collections can implement ReceiverComplete"
66)]
67pub struct ForwardHandle<'a, C: ReceiverComplete<'a, ForwardRef>> {
68    completed: bool,
69    cycle_id: CycleId,
70    expected_location: LocationId,
71    _phantom: Invariant<'a, C>,
72}
73
74#[expect(
75    private_bounds,
76    reason = "only Hydro collections can implement ReceiverComplete"
77)]
78impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
79    pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
80        Self {
81            completed: false,
82            cycle_id,
83            expected_location,
84            _phantom: std::marker::PhantomData,
85        }
86    }
87}
88
89impl<'a, C: ReceiverComplete<'a, ForwardRef>> Drop for ForwardHandle<'a, C> {
90    fn drop(&mut self) {
91        if !self.completed && !std::thread::panicking() {
92            panic!("ForwardHandle dropped without being completed");
93        }
94    }
95}
96
97#[expect(
98    private_bounds,
99    reason = "only Hydro collections can implement ReceiverComplete"
100)]
101impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
102    /// Completes the forward reference with the given live collection. The initial forward reference
103    /// collection created in [`Location::forward_ref`] will resolve to this value.
104    ///
105    /// The provided value **must not** depend _synchronously_ (in the same tick) on the forward reference
106    /// collection, as doing so would create a dependency cycle. Asynchronous cycles (outside a tick) are
107    /// allowed, since the program can continue running while the cycle is processed.
108    pub fn complete(mut self, stream: impl Into<C>) {
109        self.completed = true;
110        C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
111    }
112}
113
114/// A handle that can be used to complete a tick cycle by sending a collection to the next tick.
115///
116/// The `C` type parameter specifies the collection type that can be used to complete the handle.
117#[expect(
118    private_bounds,
119    reason = "only Hydro collections can implement ReceiverComplete"
120)]
121pub struct TickCycleHandle<'a, C: ReceiverComplete<'a, TickCycle>> {
122    completed: bool,
123    cycle_id: CycleId,
124    expected_location: LocationId,
125    _phantom: Invariant<'a, C>,
126}
127
128#[expect(
129    private_bounds,
130    reason = "only Hydro collections can implement ReceiverComplete"
131)]
132impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
133    pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
134        Self {
135            completed: false,
136            cycle_id,
137            expected_location,
138            _phantom: std::marker::PhantomData,
139        }
140    }
141}
142
143impl<'a, C: ReceiverComplete<'a, TickCycle>> Drop for TickCycleHandle<'a, C> {
144    fn drop(&mut self) {
145        if !self.completed && !std::thread::panicking() {
146            panic!("TickCycleHandle dropped without being completed");
147        }
148    }
149}
150
151#[expect(
152    private_bounds,
153    reason = "only Hydro collections can implement ReceiverComplete"
154)]
155impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
156    /// Sends the provided collection to the next tick, where it will be materialized
157    /// in the collection returned by [`crate::location::Tick::cycle`] or
158    /// [`crate::location::Tick::cycle_with_initial`].
159    pub fn complete_next_tick(mut self, stream: impl Into<C>) {
160        self.completed = true;
161        C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
162    }
163}
164
165/// A trait for completing a cycle handle with a state value.
166/// Used internally by the `sliced!` macro for state management.
167#[doc(hidden)]
168pub trait CompleteCycle<S> {
169    /// Completes the cycle with the given state value.
170    fn complete_next_tick(self, state: S);
171}
172
173impl<'a, C: ReceiverComplete<'a, TickCycle>> CompleteCycle<C> for TickCycleHandle<'a, C> {
174    fn complete_next_tick(self, state: C) {
175        TickCycleHandle::complete_next_tick(self, state)
176    }
177}