Skip to content

Environments

AsyncEnvironment

Bases: Environment, ABC

Source code in adala/environments/base.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class AsyncEnvironment(Environment, ABC):
    @abstractmethod
    async def initialize(self):
        """
        Initialize the environment, e.g by connecting to a database, reading file to memory or starting a stream.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    @abstractmethod
    async def finalize(self):
        """
        Finalize the environment, e.g by closing a database connection, closing a file or stopping a stream.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    @abstractmethod
    async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
        """
        Get a batch of data from data stream to be processed by the skill set.

        Args:
            batch_size (Optional[int], optional): The size of the batch. Defaults to None

        Returns:
            InternalDataFrame: The data batch.
        """

    @abstractmethod
    async def get_feedback(
        self,
        skills: SkillSet,
        predictions: InternalDataFrame,
        num_feedbacks: Optional[int] = None,
    ) -> EnvironmentFeedback:
        """
        Request feedback for the predictions.

        Args:
            skills (SkillSet): The set of skills/models whose predictions are being evaluated.
            predictions (InternalDataFrame): The predictions to compare with the ground truth.
            num_feedbacks (Optional[int], optional): The number of feedbacks to request. Defaults to all predictions
        Returns:
            EnvironmentFeedback: The resulting ground truth signal, with matches and errors detailed.
        """

    @abstractmethod
    async def set_predictions(self, predictions: InternalDataFrame):
        """
        Push predictions back to the environment.

        Args:
            predictions (InternalDataFrame): The predictions to push to the environment.
        """

    @abstractmethod
    async def save(self):
        """
        Save the current state of the BasicEnvironment.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    @abstractmethod
    async def restore(self):
        """
        Restore the state of the BasicEnvironment.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    class Config:
        arbitrary_types_allowed = True

finalize() abstractmethod async

Finalize the environment, e.g by closing a database connection, closing a file or stopping a stream.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
145
146
147
148
149
150
151
152
@abstractmethod
async def finalize(self):
    """
    Finalize the environment, e.g by closing a database connection, closing a file or stopping a stream.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

get_data_batch(batch_size) abstractmethod async

Get a batch of data from data stream to be processed by the skill set.

Parameters:

Name Type Description Default
batch_size Optional[int]

The size of the batch. Defaults to None

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The data batch.

Source code in adala/environments/base.py
154
155
156
157
158
159
160
161
162
163
164
@abstractmethod
async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
    """
    Get a batch of data from data stream to be processed by the skill set.

    Args:
        batch_size (Optional[int], optional): The size of the batch. Defaults to None

    Returns:
        InternalDataFrame: The data batch.
    """

get_feedback(skills, predictions, num_feedbacks=None) abstractmethod async

Request feedback for the predictions.

Parameters:

Name Type Description Default
skills SkillSet

The set of skills/models whose predictions are being evaluated.

required
predictions InternalDataFrame

The predictions to compare with the ground truth.

required
num_feedbacks Optional[int]

The number of feedbacks to request. Defaults to all predictions

None

Returns: EnvironmentFeedback: The resulting ground truth signal, with matches and errors detailed.

Source code in adala/environments/base.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@abstractmethod
async def get_feedback(
    self,
    skills: SkillSet,
    predictions: InternalDataFrame,
    num_feedbacks: Optional[int] = None,
) -> EnvironmentFeedback:
    """
    Request feedback for the predictions.

    Args:
        skills (SkillSet): The set of skills/models whose predictions are being evaluated.
        predictions (InternalDataFrame): The predictions to compare with the ground truth.
        num_feedbacks (Optional[int], optional): The number of feedbacks to request. Defaults to all predictions
    Returns:
        EnvironmentFeedback: The resulting ground truth signal, with matches and errors detailed.
    """

initialize() abstractmethod async

Initialize the environment, e.g by connecting to a database, reading file to memory or starting a stream.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
136
137
138
139
140
141
142
143
@abstractmethod
async def initialize(self):
    """
    Initialize the environment, e.g by connecting to a database, reading file to memory or starting a stream.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

restore() abstractmethod async

Restore the state of the BasicEnvironment.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
202
203
204
205
206
207
208
209
@abstractmethod
async def restore(self):
    """
    Restore the state of the BasicEnvironment.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

save() abstractmethod async

Save the current state of the BasicEnvironment.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
193
194
195
196
197
198
199
200
@abstractmethod
async def save(self):
    """
    Save the current state of the BasicEnvironment.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

set_predictions(predictions) abstractmethod async

Push predictions back to the environment.

Parameters:

Name Type Description Default
predictions InternalDataFrame

The predictions to push to the environment.

required
Source code in adala/environments/base.py
184
185
186
187
188
189
190
191
@abstractmethod
async def set_predictions(self, predictions: InternalDataFrame):
    """
    Push predictions back to the environment.

    Args:
        predictions (InternalDataFrame): The predictions to push to the environment.
    """

Environment

Bases: BaseModelInRegistry

An abstract base class that defines the structure and required methods for an environment in which machine learning models operate and are evaluated against ground truth data.

Subclasses should implement methods to handle feedback requests, comparison to ground truth, dataset conversion, and state persistence.

Source code in adala/environments/base.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class Environment(BaseModelInRegistry):
    """
    An abstract base class that defines the structure and required methods for an environment
    in which machine learning models operate and are evaluated against ground truth data.

    Subclasses should implement methods to handle feedback requests, comparison to ground truth,
    dataset conversion, and state persistence.
    """

    @abstractmethod
    def initialize(self):
        """
        Initialize the environment, e.g by connecting to a database, reading file to memory or starting a stream.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    @abstractmethod
    def finalize(self):
        """
        Finalize the environment, e.g by closing a database connection, writing memory to file or stopping a stream.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    @abstractmethod
    def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
        """
        Get a batch of data from data stream to be processed by the skill set.

        Args:
            batch_size (Optional[int], optional): The size of the batch. Defaults to None

        Returns:
            InternalDataFrame: The data batch.
        """

    @abstractmethod
    def get_feedback(
        self,
        skills: SkillSet,
        predictions: InternalDataFrame,
        num_feedbacks: Optional[int] = None,
    ) -> EnvironmentFeedback:
        """
        Request feedback for the predictions.

        Args:
            skills (SkillSet): The set of skills/models whose predictions are being evaluated.
            predictions (InternalDataFrame): The predictions to compare with the ground truth.
            num_feedbacks (Optional[int], optional): The number of feedbacks to request. Defaults to all predictions
        Returns:
            EnvironmentFeedback: The resulting ground truth signal, with matches and errors detailed.
        """

    @abstractmethod
    def save(self):
        """
        Save the current state of the BasicEnvironment.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    @abstractmethod
    def restore(self):
        """
        Restore the state of the BasicEnvironment.

        Raises:
            NotImplementedError: This method is not implemented for BasicEnvironment.
        """

    class Config:
        arbitrary_types_allowed = True

finalize() abstractmethod

Finalize the environment, e.g by closing a database connection, writing memory to file or stopping a stream.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
74
75
76
77
78
79
80
81
@abstractmethod
def finalize(self):
    """
    Finalize the environment, e.g by closing a database connection, writing memory to file or stopping a stream.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

get_data_batch(batch_size) abstractmethod

Get a batch of data from data stream to be processed by the skill set.

Parameters:

Name Type Description Default
batch_size Optional[int]

The size of the batch. Defaults to None

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The data batch.

Source code in adala/environments/base.py
83
84
85
86
87
88
89
90
91
92
93
@abstractmethod
def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
    """
    Get a batch of data from data stream to be processed by the skill set.

    Args:
        batch_size (Optional[int], optional): The size of the batch. Defaults to None

    Returns:
        InternalDataFrame: The data batch.
    """

get_feedback(skills, predictions, num_feedbacks=None) abstractmethod

Request feedback for the predictions.

Parameters:

Name Type Description Default
skills SkillSet

The set of skills/models whose predictions are being evaluated.

required
predictions InternalDataFrame

The predictions to compare with the ground truth.

required
num_feedbacks Optional[int]

The number of feedbacks to request. Defaults to all predictions

None

Returns: EnvironmentFeedback: The resulting ground truth signal, with matches and errors detailed.

Source code in adala/environments/base.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@abstractmethod
def get_feedback(
    self,
    skills: SkillSet,
    predictions: InternalDataFrame,
    num_feedbacks: Optional[int] = None,
) -> EnvironmentFeedback:
    """
    Request feedback for the predictions.

    Args:
        skills (SkillSet): The set of skills/models whose predictions are being evaluated.
        predictions (InternalDataFrame): The predictions to compare with the ground truth.
        num_feedbacks (Optional[int], optional): The number of feedbacks to request. Defaults to all predictions
    Returns:
        EnvironmentFeedback: The resulting ground truth signal, with matches and errors detailed.
    """

initialize() abstractmethod

Initialize the environment, e.g by connecting to a database, reading file to memory or starting a stream.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
65
66
67
68
69
70
71
72
@abstractmethod
def initialize(self):
    """
    Initialize the environment, e.g by connecting to a database, reading file to memory or starting a stream.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

restore() abstractmethod

Restore the state of the BasicEnvironment.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
122
123
124
125
126
127
128
129
@abstractmethod
def restore(self):
    """
    Restore the state of the BasicEnvironment.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

save() abstractmethod

Save the current state of the BasicEnvironment.

Raises:

Type Description
NotImplementedError

This method is not implemented for BasicEnvironment.

Source code in adala/environments/base.py
113
114
115
116
117
118
119
120
@abstractmethod
def save(self):
    """
    Save the current state of the BasicEnvironment.

    Raises:
        NotImplementedError: This method is not implemented for BasicEnvironment.
    """

EnvironmentFeedback

Bases: BaseModel

A class that represents the feedback received from an environment, along with the calculated correctness of predictions.

Attributes:

Name Type Description
match InternalDataFrame

A DataFrame indicating the correctness of predictions. Each row corresponds to a prediction, and each column is a boolean indicating if skill matches ground truth. Columns are named after the skill names. Indices correspond to prediction indices. Example: | index | skill_1 | skill_2 | skill_3 | |-------|---------|---------|---------| | 0 | True | True | False | | 1 | False | False | False | | 2 | True | True | True |

feedback InternalDataFrame

A DataFrame that contains ground truth feedback per each skill output

Source code in adala/environments/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class EnvironmentFeedback(BaseModel):
    """
    A class that represents the feedback received from an environment,
    along with the calculated correctness of predictions.

    Attributes:
        match (InternalDataFrame): A DataFrame indicating the correctness of predictions.
                                   Each row corresponds to a prediction, and each column is a boolean indicating if skill matches ground truth.
                                   Columns are named after the skill names.
                                   Indices correspond to prediction indices.
                                   Example:
                                       ```
                                        | index | skill_1 | skill_2 | skill_3 |
                                        |-------|---------|---------|---------|
                                        | 0     | True    | True    | False   |
                                        | 1     | False   | False   | False   |
                                        | 2     | True    | True    | True    |
                                        ```
        feedback (InternalDataFrame): A DataFrame that contains ground truth feedback per each skill output
    """

    match: InternalDataFrame
    feedback: InternalDataFrame

    class Config:
        arbitrary_types_allowed = True

    def get_accuracy(self) -> InternalSeries:
        """
        Calculate the accuracy of predictions as the mean of matches.

        Returns:
            InternalSeries: A series representing the accuracy of predictions.
        """
        return self.match.mean()

    def __rich__(self):
        text = "[bold blue]Environment Feedback:[/bold blue]\n\n"
        text += f"\n[bold]Match[/bold]\n{self.match}"
        if self.feedback is not None:
            text += f"\n[bold]Feedback[/bold]\n{self.feedback}"
        return text

get_accuracy()

Calculate the accuracy of predictions as the mean of matches.

Returns:

Name Type Description
InternalSeries InternalSeries

A series representing the accuracy of predictions.

Source code in adala/environments/base.py
39
40
41
42
43
44
45
46
def get_accuracy(self) -> InternalSeries:
    """
    Calculate the accuracy of predictions as the mean of matches.

    Returns:
        InternalSeries: A series representing the accuracy of predictions.
    """
    return self.match.mean()