Skip to content

Runtimes

AsyncRuntime

Bases: Runtime

Async version of runtime that uses asyncio to process batch of records.

Source code in adala/runtimes/base.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class AsyncRuntime(Runtime):
    """Async version of runtime that uses asyncio to process batch of records."""

    @abstractmethod
    async def record_to_record(
        self,
        record: Dict[str, str],
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, Any]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> Dict[str, str]:
        """
        Processes a record.

        Args:
            record (Dict[str, str]): The record to process.
            input_template (str): The input template.
            instructions_template (str): The instructions template.
            output_template (str): The output template.
            extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
            field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
                i.e. analogous to {"field_n": {"type": "string"}}.
            instructions_first (bool): Whether to put instructions first. Defaults to True.

        Returns:
            Dict[str, str]: The processed record.
        """

    @abstractmethod
    async def batch_to_batch(
        self,
        batch: InternalDataFrame,
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, str]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> InternalDataFrame:
        """
        Processes a record.

        Args:
            batch (InternalDataFrame): The batch to process.
            input_template (str): The input template.
            instructions_template (str): The instructions template.
            output_template (str): The output template.
            extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
            field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
                i.e. analogous to {"field_n": {"type": "string"}}.
            instructions_first (bool): Whether to put instructions first. Defaults to True.

        Returns:
            InternalDataFrame: The processed batch.
        """
        output = batch.progress_apply(
            self.record_to_record,
            axis=1,
            result_type="expand",
            input_template=input_template,
            instructions_template=instructions_template,
            output_template=output_template,
            extra_fields=extra_fields,
            field_schema=field_schema,
            instructions_first=instructions_first,
        )
        return output

batch_to_batch(batch, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=True) abstractmethod async

Processes a record.

Parameters:

Name Type Description Default
batch InternalDataFrame

The batch to process.

required
input_template str

The input template.

required
instructions_template str

The instructions template.

required
output_template str

The output template.

required
extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

None
field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

None
instructions_first bool

Whether to put instructions first. Defaults to True.

True

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The processed batch.

Source code in adala/runtimes/base.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@abstractmethod
async def batch_to_batch(
    self,
    batch: InternalDataFrame,
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, str]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = True,
) -> InternalDataFrame:
    """
    Processes a record.

    Args:
        batch (InternalDataFrame): The batch to process.
        input_template (str): The input template.
        instructions_template (str): The instructions template.
        output_template (str): The output template.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        instructions_first (bool): Whether to put instructions first. Defaults to True.

    Returns:
        InternalDataFrame: The processed batch.
    """
    output = batch.progress_apply(
        self.record_to_record,
        axis=1,
        result_type="expand",
        input_template=input_template,
        instructions_template=instructions_template,
        output_template=output_template,
        extra_fields=extra_fields,
        field_schema=field_schema,
        instructions_first=instructions_first,
    )
    return output

record_to_record(record, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=True) abstractmethod async

Processes a record.

Parameters:

Name Type Description Default
record Dict[str, str]

The record to process.

required
input_template str

The input template.

required
instructions_template str

The instructions template.

required
output_template str

The output template.

required
extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

None
field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

None
instructions_first bool

Whether to put instructions first. Defaults to True.

True

Returns:

Type Description
Dict[str, str]

Dict[str, str]: The processed record.

Source code in adala/runtimes/base.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@abstractmethod
async def record_to_record(
    self,
    record: Dict[str, str],
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, Any]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = True,
) -> Dict[str, str]:
    """
    Processes a record.

    Args:
        record (Dict[str, str]): The record to process.
        input_template (str): The input template.
        instructions_template (str): The instructions template.
        output_template (str): The output template.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        instructions_first (bool): Whether to put instructions first. Defaults to True.

    Returns:
        Dict[str, str]: The processed record.
    """

Runtime

Bases: BaseModelInRegistry

Base class representing a generic runtime environment.

Attributes:

Name Type Description
verbose bool

Flag indicating if runtime outputs should be verbose. Defaults to False.

batch_size Optional[int]

The batch size to use for processing records. Defaults to None.

Source code in adala/runtimes/base.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class Runtime(BaseModelInRegistry):
    """
    Base class representing a generic runtime environment.

    Attributes:
        verbose (bool): Flag indicating if runtime outputs should be verbose. Defaults to False.
        batch_size (Optional[int]): The batch size to use for processing records. Defaults to None.
    """

    verbose: bool = False
    batch_size: Optional[int] = None

    @model_validator(mode="after")
    def init_runtime(self) -> "Runtime":
        """Initializes the runtime.

        This method should be used to validate and potentially initialize the runtime instance.

        Returns:
            Runtime: The initialized runtime instance.
        """
        return self

    @abstractmethod
    def record_to_record(
        self,
        record: Dict[str, str],
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, Any]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> Dict[str, str]:
        """
        Processes a record.

        Args:
            record (Dict[str, str]): The record to process.
            input_template (str): The input template.
            instructions_template (str): The instructions template.
            output_template (str): The output template.
            extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
            field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
                i.e. analogous to {"field_n": {"type": "string"}}.
            instructions_first (bool): Whether to put instructions first. Defaults to True.

        Returns:
            Dict[str, str]: The processed record.
        """

    def batch_to_batch(
        self,
        batch: InternalDataFrame,
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, str]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> InternalDataFrame:
        """
        Processes a record.

        Args:
            batch (InternalDataFrame): The batch to process.
            input_template (str): The input template.
            instructions_template (str): The instructions template.
            output_template (str): The output template.
            extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
            field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
                i.e. analogous to {"field_n": {"type": "string"}}.
            instructions_first (bool): Whether to put instructions first. Defaults to True.

        Returns:
            InternalDataFrame: The processed batch.
        """
        output = batch.progress_apply(
            self.record_to_record,
            axis=1,
            result_type="expand",
            input_template=input_template,
            instructions_template=instructions_template,
            output_template=output_template,
            extra_fields=extra_fields,
            field_schema=field_schema,
            instructions_first=instructions_first,
        )
        return output

    def record_to_batch(
        self,
        record: Dict[str, str],
        input_template: str,
        instructions_template: str,
        output_template: str,
        output_batch_size: int = 1,
        extra_fields: Optional[Dict[str, str]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> InternalDataFrame:
        """
        Processes a record and return a batch.

        Args:
            record (Dict[str, str]): The record to process.
            input_template (str): The input template.
            instructions_template (str): The instructions template.
            output_template (str): The output template.
            output_batch_size (int): The batch size for the output. Defaults to 1.
            extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
            field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
                i.e. analogous to {"field_n": {"type": "string"}}.
            instructions_first (bool): Whether to put instructions first. Defaults to True.

        Returns:
            InternalDataFrame: The processed batch.
        """
        batch = InternalDataFrame([record] * output_batch_size)
        return self.batch_to_batch(
            batch=batch,
            input_template=input_template,
            instructions_template=instructions_template,
            output_template=output_template,
            extra_fields=extra_fields,
            field_schema=field_schema,
            instructions_first=instructions_first,
        )

batch_to_batch(batch, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=True)

Processes a record.

Parameters:

Name Type Description Default
batch InternalDataFrame

The batch to process.

required
input_template str

The input template.

required
instructions_template str

The instructions template.

required
output_template str

The output template.

required
extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

None
field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

None
instructions_first bool

Whether to put instructions first. Defaults to True.

True

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The processed batch.

Source code in adala/runtimes/base.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def batch_to_batch(
    self,
    batch: InternalDataFrame,
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, str]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = True,
) -> InternalDataFrame:
    """
    Processes a record.

    Args:
        batch (InternalDataFrame): The batch to process.
        input_template (str): The input template.
        instructions_template (str): The instructions template.
        output_template (str): The output template.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        instructions_first (bool): Whether to put instructions first. Defaults to True.

    Returns:
        InternalDataFrame: The processed batch.
    """
    output = batch.progress_apply(
        self.record_to_record,
        axis=1,
        result_type="expand",
        input_template=input_template,
        instructions_template=instructions_template,
        output_template=output_template,
        extra_fields=extra_fields,
        field_schema=field_schema,
        instructions_first=instructions_first,
    )
    return output

init_runtime()

Initializes the runtime.

This method should be used to validate and potentially initialize the runtime instance.

Returns:

Name Type Description
Runtime Runtime

The initialized runtime instance.

Source code in adala/runtimes/base.py
23
24
25
26
27
28
29
30
31
32
@model_validator(mode="after")
def init_runtime(self) -> "Runtime":
    """Initializes the runtime.

    This method should be used to validate and potentially initialize the runtime instance.

    Returns:
        Runtime: The initialized runtime instance.
    """
    return self

record_to_batch(record, input_template, instructions_template, output_template, output_batch_size=1, extra_fields=None, field_schema=None, instructions_first=True)

Processes a record and return a batch.

Parameters:

Name Type Description Default
record Dict[str, str]

The record to process.

required
input_template str

The input template.

required
instructions_template str

The instructions template.

required
output_template str

The output template.

required
output_batch_size int

The batch size for the output. Defaults to 1.

1
extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

None
field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

None
instructions_first bool

Whether to put instructions first. Defaults to True.

True

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The processed batch.

Source code in adala/runtimes/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def record_to_batch(
    self,
    record: Dict[str, str],
    input_template: str,
    instructions_template: str,
    output_template: str,
    output_batch_size: int = 1,
    extra_fields: Optional[Dict[str, str]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = True,
) -> InternalDataFrame:
    """
    Processes a record and return a batch.

    Args:
        record (Dict[str, str]): The record to process.
        input_template (str): The input template.
        instructions_template (str): The instructions template.
        output_template (str): The output template.
        output_batch_size (int): The batch size for the output. Defaults to 1.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        instructions_first (bool): Whether to put instructions first. Defaults to True.

    Returns:
        InternalDataFrame: The processed batch.
    """
    batch = InternalDataFrame([record] * output_batch_size)
    return self.batch_to_batch(
        batch=batch,
        input_template=input_template,
        instructions_template=instructions_template,
        output_template=output_template,
        extra_fields=extra_fields,
        field_schema=field_schema,
        instructions_first=instructions_first,
    )

record_to_record(record, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=True) abstractmethod

Processes a record.

Parameters:

Name Type Description Default
record Dict[str, str]

The record to process.

required
input_template str

The input template.

required
instructions_template str

The instructions template.

required
output_template str

The output template.

required
extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

None
field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

None
instructions_first bool

Whether to put instructions first. Defaults to True.

True

Returns:

Type Description
Dict[str, str]

Dict[str, str]: The processed record.

Source code in adala/runtimes/base.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@abstractmethod
def record_to_record(
    self,
    record: Dict[str, str],
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, Any]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = True,
) -> Dict[str, str]:
    """
    Processes a record.

    Args:
        record (Dict[str, str]): The record to process.
        input_template (str): The input template.
        instructions_template (str): The instructions template.
        output_template (str): The output template.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        instructions_first (bool): Whether to put instructions first. Defaults to True.

    Returns:
        Dict[str, str]: The processed record.
    """

AsyncOpenAIChatRuntime

Bases: AsyncRuntime

Runtime that uses OpenAI API and chat completion models to perform the skill. It uses async calls to OpenAI API.

Attributes:

Name Type Description
openai_model str

OpenAI model name.

openai_api_key Optional[str]

OpenAI API key. If not provided, will be taken from OPENAI_API_KEY environment variable.

max_tokens Optional[int]

Maximum number of tokens to generate. Defaults to 1000.

temperature Optional[float]

Temperature for sampling, between 0 and 1. Higher values means the model will take more risks. Try 0.9 for more creative applications, and 0 (argmax sampling) for ones with a well-defined answer. Defaults to 0.0.

concurrent_clients Optional[int]

Number of concurrent clients to OpenAI API. More clients means more parallel requests, but also more money spent and more chances to hit the rate limit. Defaults to 10.

Source code in adala/runtimes/_openai.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
class AsyncOpenAIChatRuntime(AsyncRuntime):
    """
    Runtime that uses [OpenAI API](https://openai.com/) and chat completion models to perform the skill.
    It uses async calls to OpenAI API.

    Attributes:
        openai_model: OpenAI model name.
        openai_api_key: OpenAI API key. If not provided, will be taken from OPENAI_API_KEY environment variable.
        max_tokens: Maximum number of tokens to generate. Defaults to 1000.
        temperature: Temperature for sampling, between 0 and 1. Higher values means the model will take more risks.
            Try 0.9 for more creative applications, and 0 (argmax sampling) for ones with a well-defined answer.
            Defaults to 0.0.

        concurrent_clients: Number of concurrent clients to OpenAI API. More clients means more parallel requests, but
            also more money spent and more chances to hit the rate limit. Defaults to 10.
    """

    openai_model: str = Field(alias="model")
    openai_api_key: Optional[str] = Field(
        default=os.getenv("OPENAI_API_KEY"), alias="api_key"
    )
    max_tokens: Optional[int] = 1000
    temperature: Optional[float] = 0.0
    splitter: Optional[str] = None
    concurrent_clients: Optional[int] = 10
    timeout: Optional[int] = 10

    def init_runtime(self) -> "Runtime":
        # check model availability
        try:
            _client = OpenAI(api_key=self.openai_api_key)
            _client.models.retrieve(self.openai_model)
        except NotFoundError:
            raise ValueError(
                f'Requested model "{self.openai_model}" is not available in your OpenAI account.'
            )
        return self

    def _prepare_prompt(
        self,
        row,
        input_template: str,
        instructions_template: str,
        suffix: str,
        extra_fields: dict,
    ) -> Dict[str, str]:
        """Prepare input prompt for OpenAI API from the row of the dataframe"""
        return {
            "system": instructions_template,
            "user": input_template.format(**row, **extra_fields) + suffix,
        }

    async def batch_to_batch(
        self,
        batch: InternalDataFrame,
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, str]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> InternalDataFrame:
        """Execute batch of requests with async calls to OpenAI API"""

        extra_fields = extra_fields or {}
        field_schema = field_schema or {}

        options = {}
        for field, schema in field_schema.items():
            if schema.get("type") == "array":
                options[field] = schema.get("items", {}).get("enum", [])

        output_fields = parse_template(
            partial_str_format(output_template, **extra_fields), include_texts=True
        )

        if len(output_fields) > 2:
            raise NotImplementedError("Only one output field is supported")

        suffix = ""
        outputs = []
        for output_field in output_fields:
            if output_field["type"] == "text":
                suffix += output_field["text"]

            elif output_field["type"] == "var":
                name = output_field["text"]
                # prepare prompts
                prompts = batch.apply(
                    lambda row: self._prepare_prompt(
                        row, input_template, instructions_template, suffix, extra_fields
                    ),
                    axis=1,
                ).tolist()

                responses = await async_concurrent_create_completion(
                    prompts=prompts,
                    max_concurrent_requests=self.concurrent_clients,
                    instruction_first=instructions_first,
                    timeout=self.timeout,
                    max_tokens=self.max_tokens,
                    temperature=self.temperature,
                    openai_model=self.openai_model,
                    openai_api_key=self.openai_api_key,
                )

                # parse responses, optionally match it with options
                for prompt, response, idx in zip(prompts, responses, batch.index):
                    # check for errors - if any, append to outputs and continue
                    if response.get("error"):
                        # FIXME if we collect failed and succeeded outputs in the same list -> df, we end up with an awkward schema like this:
                        # output error message details
                        # ---------------------------
                        # output1 nan    nan      nan
                        # nan     true   message2 details2
                        # we are not going to send the error response to lse
                        # outputs.append(response)
                        if self.verbose:
                            print_error(
                                f"Prompt: {prompt}\nOpenAI API error: {response}"
                            )
                        continue

                    # otherwise, append the response to outputs
                    completion_text = response["text"]
                    if self.verbose:
                        print(
                            f"Prompt: {prompt}\nOpenAI API response: {completion_text}"
                        )
                    if name in options:
                        completion_text = match_options(completion_text, options[name])
                    outputs.append({name: completion_text, "index": idx})

        # TODO: note that this doesn't work for multiple output fields e.g. `Output {output1} and Output {output2}`
        output_df = InternalDataFrame(outputs)
        # return output dataframe indexed as input batch.index, assuming outputs are in the same order as inputs
        return output_df.set_index("index")

    async def record_to_record(
        self,
        record: Dict[str, str],
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, Any]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = True,
    ) -> Dict[str, str]:
        raise NotImplementedError("record_to_record is not implemented")

batch_to_batch(batch, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=True) async

Execute batch of requests with async calls to OpenAI API

Source code in adala/runtimes/_openai.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
async def batch_to_batch(
    self,
    batch: InternalDataFrame,
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, str]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = True,
) -> InternalDataFrame:
    """Execute batch of requests with async calls to OpenAI API"""

    extra_fields = extra_fields or {}
    field_schema = field_schema or {}

    options = {}
    for field, schema in field_schema.items():
        if schema.get("type") == "array":
            options[field] = schema.get("items", {}).get("enum", [])

    output_fields = parse_template(
        partial_str_format(output_template, **extra_fields), include_texts=True
    )

    if len(output_fields) > 2:
        raise NotImplementedError("Only one output field is supported")

    suffix = ""
    outputs = []
    for output_field in output_fields:
        if output_field["type"] == "text":
            suffix += output_field["text"]

        elif output_field["type"] == "var":
            name = output_field["text"]
            # prepare prompts
            prompts = batch.apply(
                lambda row: self._prepare_prompt(
                    row, input_template, instructions_template, suffix, extra_fields
                ),
                axis=1,
            ).tolist()

            responses = await async_concurrent_create_completion(
                prompts=prompts,
                max_concurrent_requests=self.concurrent_clients,
                instruction_first=instructions_first,
                timeout=self.timeout,
                max_tokens=self.max_tokens,
                temperature=self.temperature,
                openai_model=self.openai_model,
                openai_api_key=self.openai_api_key,
            )

            # parse responses, optionally match it with options
            for prompt, response, idx in zip(prompts, responses, batch.index):
                # check for errors - if any, append to outputs and continue
                if response.get("error"):
                    # FIXME if we collect failed and succeeded outputs in the same list -> df, we end up with an awkward schema like this:
                    # output error message details
                    # ---------------------------
                    # output1 nan    nan      nan
                    # nan     true   message2 details2
                    # we are not going to send the error response to lse
                    # outputs.append(response)
                    if self.verbose:
                        print_error(
                            f"Prompt: {prompt}\nOpenAI API error: {response}"
                        )
                    continue

                # otherwise, append the response to outputs
                completion_text = response["text"]
                if self.verbose:
                    print(
                        f"Prompt: {prompt}\nOpenAI API response: {completion_text}"
                    )
                if name in options:
                    completion_text = match_options(completion_text, options[name])
                outputs.append({name: completion_text, "index": idx})

    # TODO: note that this doesn't work for multiple output fields e.g. `Output {output1} and Output {output2}`
    output_df = InternalDataFrame(outputs)
    # return output dataframe indexed as input batch.index, assuming outputs are in the same order as inputs
    return output_df.set_index("index")

OpenAIChatRuntime

Bases: Runtime

Runtime that uses OpenAI API and chat completion models to perform the skill.

Attributes:

Name Type Description
openai_model str

OpenAI model name.

openai_api_key Optional[str]

OpenAI API key. If not provided, will be taken from OPENAI_API_KEY environment variable.

max_tokens Optional[int]

Maximum number of tokens to generate. Defaults to 1000.

Source code in adala/runtimes/_openai.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class OpenAIChatRuntime(Runtime):
    """
    Runtime that uses [OpenAI API](https://openai.com/) and chat completion models to perform the skill.

    Attributes:
        openai_model: OpenAI model name.
        openai_api_key: OpenAI API key. If not provided, will be taken from OPENAI_API_KEY environment variable.
        max_tokens: Maximum number of tokens to generate. Defaults to 1000.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)  # for @computed_field

    openai_model: str = Field(alias="model")
    openai_api_key: Optional[str] = Field(
        default=os.getenv("OPENAI_API_KEY"), alias="api_key"
    )
    max_tokens: Optional[int] = 1000
    splitter: Optional[str] = None

    @computed_field
    def _client(self) -> OpenAI:
        return OpenAI(api_key=self.openai_api_key)

    def init_runtime(self) -> "Runtime":
        # check model availability
        try:
            self._client.models.retrieve(self.openai_model)
        except NotFoundError:
            raise ValueError(
                f'Requested model "{self.openai_model}" is not available in your OpenAI account.'
            )
        return self

    def execute(self, messages: List):
        """
        Execute OpenAI request given list of messages in OpenAI API format
        """
        if self.verbose:
            print(f"OpenAI request: {messages}")

        completion = self._client.chat.completions.create(
            model=self.openai_model, messages=messages
        )
        completion_text = completion.choices[0].message.content

        if self.verbose:
            print(f"OpenAI response: {completion_text}")
        return completion_text

    def record_to_record(
        self,
        record: Dict[str, str],
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, str]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = False,
    ) -> Dict[str, str]:
        """
        Execute OpenAI request given record and templates for input, instructions and output.

        Args:
            record: Record to be used for input, instructions and output templates.
            input_template: Template for input message.
            instructions_template: Template for instructions message.
            output_template: Template for output message.
            extra_fields: Extra fields to be used in templates.
            field_schema: Field schema to be used for parsing templates.
            instructions_first: If True, instructions will be sent before input.

        Returns:
            Dict[str, str]: Output record.
        """

        extra_fields = extra_fields or {}
        field_schema = field_schema or {}

        options = {}
        for field, schema in field_schema.items():
            if schema.get("type") == "array":
                options[field] = schema.get("items", {}).get("enum", [])

        output_fields = parse_template(
            partial_str_format(output_template, **extra_fields), include_texts=True
        )
        system_prompt = instructions_template
        user_prompt = input_template.format(**record, **extra_fields)
        messages = [{"role": "system", "content": system_prompt}]

        outputs = {}
        for output_field in output_fields:
            if output_field["type"] == "text":
                if user_prompt is not None:
                    user_prompt += f"\n{output_field['text']}"
                else:
                    user_prompt = output_field["text"]
            elif output_field["type"] == "var":
                name = output_field["text"]
                messages.append({"role": "user", "content": user_prompt})
                completion_text = self.execute(messages)
                if name in options:
                    completion_text = match_options(completion_text, options[name])
                outputs[name] = completion_text
                messages.append({"role": "assistant", "content": completion_text})
                user_prompt = None

        return outputs

execute(messages)

Execute OpenAI request given list of messages in OpenAI API format

Source code in adala/runtimes/_openai.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def execute(self, messages: List):
    """
    Execute OpenAI request given list of messages in OpenAI API format
    """
    if self.verbose:
        print(f"OpenAI request: {messages}")

    completion = self._client.chat.completions.create(
        model=self.openai_model, messages=messages
    )
    completion_text = completion.choices[0].message.content

    if self.verbose:
        print(f"OpenAI response: {completion_text}")
    return completion_text

record_to_record(record, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=False)

Execute OpenAI request given record and templates for input, instructions and output.

Parameters:

Name Type Description Default
record Dict[str, str]

Record to be used for input, instructions and output templates.

required
input_template str

Template for input message.

required
instructions_template str

Template for instructions message.

required
output_template str

Template for output message.

required
extra_fields Optional[Dict[str, str]]

Extra fields to be used in templates.

None
field_schema Optional[Dict]

Field schema to be used for parsing templates.

None
instructions_first bool

If True, instructions will be sent before input.

False

Returns:

Type Description
Dict[str, str]

Dict[str, str]: Output record.

Source code in adala/runtimes/_openai.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def record_to_record(
    self,
    record: Dict[str, str],
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, str]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = False,
) -> Dict[str, str]:
    """
    Execute OpenAI request given record and templates for input, instructions and output.

    Args:
        record: Record to be used for input, instructions and output templates.
        input_template: Template for input message.
        instructions_template: Template for instructions message.
        output_template: Template for output message.
        extra_fields: Extra fields to be used in templates.
        field_schema: Field schema to be used for parsing templates.
        instructions_first: If True, instructions will be sent before input.

    Returns:
        Dict[str, str]: Output record.
    """

    extra_fields = extra_fields or {}
    field_schema = field_schema or {}

    options = {}
    for field, schema in field_schema.items():
        if schema.get("type") == "array":
            options[field] = schema.get("items", {}).get("enum", [])

    output_fields = parse_template(
        partial_str_format(output_template, **extra_fields), include_texts=True
    )
    system_prompt = instructions_template
    user_prompt = input_template.format(**record, **extra_fields)
    messages = [{"role": "system", "content": system_prompt}]

    outputs = {}
    for output_field in output_fields:
        if output_field["type"] == "text":
            if user_prompt is not None:
                user_prompt += f"\n{output_field['text']}"
            else:
                user_prompt = output_field["text"]
        elif output_field["type"] == "var":
            name = output_field["text"]
            messages.append({"role": "user", "content": user_prompt})
            completion_text = self.execute(messages)
            if name in options:
                completion_text = match_options(completion_text, options[name])
            outputs[name] = completion_text
            messages.append({"role": "assistant", "content": completion_text})
            user_prompt = None

    return outputs

OpenAIVisionRuntime

Bases: OpenAIChatRuntime

Runtime that uses OpenAI API and vision models to perform the skill. Only compatible with OpenAI API version 1.0.0 or higher.

Source code in adala/runtimes/_openai.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
class OpenAIVisionRuntime(OpenAIChatRuntime):
    """
    Runtime that uses [OpenAI API](https://openai.com/) and vision models to perform the skill.
    Only compatible with OpenAI API version 1.0.0 or higher.
    """

    def record_to_record(
        self,
        record: Dict[str, str],
        input_template: str,
        instructions_template: str,
        output_template: str,
        extra_fields: Optional[Dict[str, str]] = None,
        field_schema: Optional[Dict] = None,
        instructions_first: bool = False,
    ) -> Dict[str, str]:
        """
        Execute OpenAI request given record and templates for input, instructions and output.

        Args:
            record: Record to be used for input, instructions and output templates.
            input_template: Template for input message.
            instructions_template: Template for instructions message.
            output_template: Template for output message.
            extra_fields: Extra fields to be used in templates.
            field_schema: Field jsonschema to be used for parsing templates.
                         Field schema must contain "format": "uri" for image fields. For example:
                            ```json
                            {
                                "image": {
                                    "type": "string",
                                    "format": "uri"
                                }
                            }
                            ```
            instructions_first: If True, instructions will be sent before input.
        """

        if not check_if_new_openai_version():
            raise NotImplementedError(
                f"{self.__class__.__name__} requires OpenAI API version 1.0.0 or higher."
            )

        extra_fields = extra_fields or {}
        field_schema = field_schema or {}

        output_fields = parse_template(
            partial_str_format(output_template, **extra_fields), include_texts=False
        )

        if len(output_fields) > 1:
            raise NotImplementedError(
                f"{self.__class__.__name__} does not support multiple output fields. "
                f"Found: {output_fields}"
            )
        output_field = output_fields[0]
        output_field_name = output_field["text"]

        input_fields = parse_template(input_template)

        # split input template into text and image parts
        input_text = ""
        content = [
            {
                "type": "text",
                "text": instructions_template,
            }
        ]
        for field in input_fields:
            if field["type"] == "text":
                input_text += field["text"]
            elif field["type"] == "var":
                if field["text"] not in field_schema:
                    input_text += record[field["text"]]
                elif field_schema[field["text"]]["type"] == "string":
                    if field_schema[field["text"]].get("format") == "uri":
                        if input_text:
                            content.append({"type": "text", "text": input_text})
                            input_text = ""
                        content.append(
                            {"type": "image_url", "image_url": record[field["text"]]}
                        )
                    else:
                        input_text += record[field["text"]]
                else:
                    raise ValueError(
                        f'Unsupported field type: {field_schema[field["text"]]["type"]}'
                    )
        if input_text:
            content.append({"type": "text", "text": input_text})

        if self.verbose:
            print(f"**Prompt content**:\n{content}")

        completion = self._client.chat.completions.create(
            model=self.openai_model,
            messages=[{"role": "user", "content": content}],
            max_tokens=self.max_tokens,
        )

        completion_text = completion.choices[0].message.content
        return {output_field_name: completion_text}

record_to_record(record, input_template, instructions_template, output_template, extra_fields=None, field_schema=None, instructions_first=False)

Execute OpenAI request given record and templates for input, instructions and output.

Parameters:

Name Type Description Default
record Dict[str, str]

Record to be used for input, instructions and output templates.

required
input_template str

Template for input message.

required
instructions_template str

Template for instructions message.

required
output_template str

Template for output message.

required
extra_fields Optional[Dict[str, str]]

Extra fields to be used in templates.

None
field_schema Optional[Dict]

Field jsonschema to be used for parsing templates. Field schema must contain "format": "uri" for image fields. For example: json { "image": { "type": "string", "format": "uri" } }

None
instructions_first bool

If True, instructions will be sent before input.

False
Source code in adala/runtimes/_openai.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def record_to_record(
    self,
    record: Dict[str, str],
    input_template: str,
    instructions_template: str,
    output_template: str,
    extra_fields: Optional[Dict[str, str]] = None,
    field_schema: Optional[Dict] = None,
    instructions_first: bool = False,
) -> Dict[str, str]:
    """
    Execute OpenAI request given record and templates for input, instructions and output.

    Args:
        record: Record to be used for input, instructions and output templates.
        input_template: Template for input message.
        instructions_template: Template for instructions message.
        output_template: Template for output message.
        extra_fields: Extra fields to be used in templates.
        field_schema: Field jsonschema to be used for parsing templates.
                     Field schema must contain "format": "uri" for image fields. For example:
                        ```json
                        {
                            "image": {
                                "type": "string",
                                "format": "uri"
                            }
                        }
                        ```
        instructions_first: If True, instructions will be sent before input.
    """

    if not check_if_new_openai_version():
        raise NotImplementedError(
            f"{self.__class__.__name__} requires OpenAI API version 1.0.0 or higher."
        )

    extra_fields = extra_fields or {}
    field_schema = field_schema or {}

    output_fields = parse_template(
        partial_str_format(output_template, **extra_fields), include_texts=False
    )

    if len(output_fields) > 1:
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support multiple output fields. "
            f"Found: {output_fields}"
        )
    output_field = output_fields[0]
    output_field_name = output_field["text"]

    input_fields = parse_template(input_template)

    # split input template into text and image parts
    input_text = ""
    content = [
        {
            "type": "text",
            "text": instructions_template,
        }
    ]
    for field in input_fields:
        if field["type"] == "text":
            input_text += field["text"]
        elif field["type"] == "var":
            if field["text"] not in field_schema:
                input_text += record[field["text"]]
            elif field_schema[field["text"]]["type"] == "string":
                if field_schema[field["text"]].get("format") == "uri":
                    if input_text:
                        content.append({"type": "text", "text": input_text})
                        input_text = ""
                    content.append(
                        {"type": "image_url", "image_url": record[field["text"]]}
                    )
                else:
                    input_text += record[field["text"]]
            else:
                raise ValueError(
                    f'Unsupported field type: {field_schema[field["text"]]["type"]}'
                )
    if input_text:
        content.append({"type": "text", "text": input_text})

    if self.verbose:
        print(f"**Prompt content**:\n{content}")

    completion = self._client.chat.completions.create(
        model=self.openai_model,
        messages=[{"role": "user", "content": content}],
        max_tokens=self.max_tokens,
    )

    completion_text = completion.choices[0].message.content
    return {output_field_name: completion_text}

async_create_completion(model, user_prompt, system_prompt=None, openai_api_key=None, instruction_first=True, semaphore=None, max_tokens=1000, temperature=0.0, session=None, default_timeout=10) async

Async version of create_completion function with error handling and session timeout.

Parameters:

Name Type Description Default
model str

OpenAI model name.

required
user_prompt str

User prompt.

required
system_prompt str

System prompt.

None
openai_api_key str

OpenAI API key (if not set, will use OPENAI_API_KEY environment variable).

None
instruction_first bool

Whether to put instructions first.

True
semaphore Semaphore

Semaphore to limit concurrent requests.

None
max_tokens int

Maximum tokens to generate.

1000
temperature float

Temperature for sampling.

0.0
session ClientSession

aiohttp session to use for requests.

None
default_timeout int

Default timeout for the session.

10

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: OpenAI response or error message.

Source code in adala/runtimes/_openai.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def async_create_completion(
    model: str,
    user_prompt: str,
    system_prompt: str = None,
    openai_api_key: str = None,
    instruction_first: bool = True,
    semaphore: asyncio.Semaphore = None,
    max_tokens: int = 1000,
    temperature: float = 0.0,
    session: aiohttp.ClientSession = None,
    default_timeout: int = 10,
) -> Dict[str, Any]:
    """
    Async version of create_completion function with error handling and session timeout.

    Args:
        model: OpenAI model name.
        user_prompt: User prompt.
        system_prompt: System prompt.
        openai_api_key: OpenAI API key (if not set, will use OPENAI_API_KEY environment variable).
        instruction_first: Whether to put instructions first.
        semaphore: Semaphore to limit concurrent requests.
        max_tokens: Maximum tokens to generate.
        temperature: Temperature for sampling.
        session: aiohttp session to use for requests.
        default_timeout: Default timeout for the session.

    Returns:
        Dict[str, Any]: OpenAI response or error message.
    """
    openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
    if not semaphore:
        semaphore = asyncio.Semaphore(1)
    if not session:
        session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=default_timeout)
        )
    messages = [{"role": "user", "content": user_prompt}]
    if system_prompt:
        if instruction_first:
            messages.insert(0, {"role": "system", "content": system_prompt})
        else:
            messages[0]["content"] += system_prompt

    try:
        async with semaphore, session.post(
            DEFAULT_CREATE_COMPLETION_URL,
            headers={"Authorization": f"Bearer {openai_api_key}"},
            json={
                "messages": messages,
                "model": model,
                "max_tokens": max_tokens,
                "temperature": temperature,
            },
        ) as response:
            response.raise_for_status()
            response_json = await response.json()
            completion_text = response_json["choices"][0]["message"]["content"]
            return {
                "text": completion_text,
            }
    except aiohttp.ClientResponseError as e:
        # Handle HTTP errors
        return {
            "error": True,
            "message": f"HTTP error: {e.status}",
            "details": str(e),
        }
    except aiohttp.ClientError as e:
        # Handle other aiohttp specific errors
        return {
            "error": True,
            "message": "Client error",
            "details": str(e),
        }
    except asyncio.TimeoutError as e:
        # Handle timeout errors
        return {
            "error": True,
            "message": "Request timed out",
            "details": str(e),
        }
    except Exception as e:
        # Handle other exceptions
        return {
            "error": True,
            "message": "Unknown error",
            "details": str(e),
        }