Skip to content

Skills

AnalysisSkill

Bases: Skill

Analysis skill that analyzes a dataframe and returns a record (e.g. for data analysis purposes). See base class Skill for more information about the attributes.

Source code in adala/skills/_base.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class AnalysisSkill(Skill):
    """
    Analysis skill that analyzes a dataframe and returns a record (e.g. for data analysis purposes).
    See base class Skill for more information about the attributes.
    """

    input_separator: str = "\n"
    chunk_size: Optional[int] = None

    def apply(
        self,
        input: Union[InternalDataFrame, InternalSeries, Dict],
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns a record.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalSeries: The record containing the analysis results.
        """
        if isinstance(input, InternalSeries):
            input = input.to_frame()
        elif isinstance(input, dict):
            input = InternalDataFrame([input])

        extra_fields = self._get_extra_fields()

        # if chunk_size is specified, split the input into chunks and process each chunk separately
        if self.chunk_size is not None:
            chunks = (
                input.iloc[i : i + self.chunk_size]
                for i in range(0, len(input), self.chunk_size)
            )
        else:
            chunks = [input]
        outputs = []
        total = input.shape[0] // self.chunk_size if self.chunk_size is not None else 1
        for chunk in tqdm(chunks, desc="Processing chunks", total=total):
            agg_chunk = (
                chunk.reset_index()
                .apply(
                    lambda row: self.input_template.format(
                        **row, **extra_fields, i=int(row.name) + 1
                    ),
                    axis=1,
                )
                .str.cat(sep=self.input_separator)
            )
            output = runtime.record_to_record(
                {"input": agg_chunk},
                input_template="{input}",
                output_template=self.output_template,
                instructions_template=self.instructions,
                extra_fields=extra_fields,
                instructions_first=self.instructions_first,
            )
            outputs.append(InternalSeries(output))
        output = InternalDataFrame(outputs)

        return output

    def improve(self, **kwargs):
        """
        Improves the skill.
        """
        raise NotImplementedError

apply(input, runtime)

Applies the skill to a dataframe and returns a record.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalSeries InternalDataFrame

The record containing the analysis results.

Source code in adala/skills/_base.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def apply(
    self,
    input: Union[InternalDataFrame, InternalSeries, Dict],
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns a record.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalSeries: The record containing the analysis results.
    """
    if isinstance(input, InternalSeries):
        input = input.to_frame()
    elif isinstance(input, dict):
        input = InternalDataFrame([input])

    extra_fields = self._get_extra_fields()

    # if chunk_size is specified, split the input into chunks and process each chunk separately
    if self.chunk_size is not None:
        chunks = (
            input.iloc[i : i + self.chunk_size]
            for i in range(0, len(input), self.chunk_size)
        )
    else:
        chunks = [input]
    outputs = []
    total = input.shape[0] // self.chunk_size if self.chunk_size is not None else 1
    for chunk in tqdm(chunks, desc="Processing chunks", total=total):
        agg_chunk = (
            chunk.reset_index()
            .apply(
                lambda row: self.input_template.format(
                    **row, **extra_fields, i=int(row.name) + 1
                ),
                axis=1,
            )
            .str.cat(sep=self.input_separator)
        )
        output = runtime.record_to_record(
            {"input": agg_chunk},
            input_template="{input}",
            output_template=self.output_template,
            instructions_template=self.instructions,
            extra_fields=extra_fields,
            instructions_first=self.instructions_first,
        )
        outputs.append(InternalSeries(output))
    output = InternalDataFrame(outputs)

    return output

improve(**kwargs)

Improves the skill.

Source code in adala/skills/_base.py
488
489
490
491
492
def improve(self, **kwargs):
    """
    Improves the skill.
    """
    raise NotImplementedError

SampleTransformSkill

Bases: TransformSkill

Source code in adala/skills/_base.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
class SampleTransformSkill(TransformSkill):
    sample_size: int

    def apply(
        self,
        input: InternalDataFrame,
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns a dataframe.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The processed data.
        """
        return super(SampleTransformSkill, self).apply(
            input.sample(self.sample_size), runtime
        )

apply(input, runtime)

Applies the skill to a dataframe and returns a dataframe.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The processed data.

Source code in adala/skills/_base.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def apply(
    self,
    input: InternalDataFrame,
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns a dataframe.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The processed data.
    """
    return super(SampleTransformSkill, self).apply(
        input.sample(self.sample_size), runtime
    )

Skill

Bases: BaseModelInRegistry

Abstract base class representing a skill.

Provides methods to interact with and obtain information about skills.

Attributes:

Name Type Description
name str

Unique name of the skill.

instructions str

Instructs agent what to do with the input data.

input_template str

Template for the input data.

output_template str

Template for the output data.

description Optional[str]

Description of the skill.

field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

instructions_first bool

Flag indicating if instructions should be executed before input. Defaults to True.

verbose bool

Flag indicating if runtime outputs should be verbose. Defaults to False.

frozen bool

Flag indicating if the skill is frozen. Defaults to False.

type ClassVar[str]

Type of the skill.

Source code in adala/skills/_base.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class Skill(BaseModelInRegistry):
    """
    Abstract base class representing a skill.

    Provides methods to interact with and obtain information about skills.

    Attributes:
        name (str): Unique name of the skill.
        instructions (str): Instructs agent what to do with the input data.
        input_template (str): Template for the input data.
        output_template (str): Template for the output data.
        description (Optional[str]): Description of the skill.
        field_schema (Optional[Dict]): Field [JSON schema](https://json-schema.org/) to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        instructions_first (bool): Flag indicating if instructions should be executed before input. Defaults to True.
        verbose (bool): Flag indicating if runtime outputs should be verbose. Defaults to False.
        frozen (bool): Flag indicating if the skill is frozen. Defaults to False.
        type (ClassVar[str]): Type of the skill.
    """

    name: str = Field(
        title="Skill name",
        description="Unique name of the skill",
        examples=["labeling", "classification", "text-generation"],
    )
    instructions: str = Field(
        title="Skill instructions",
        description="Instructs agent what to do with the input data. "
        "Can use templating to refer to input fields.",
        examples=["Label the input text with the following labels: {labels}"],
    )
    input_template: str = Field(
        title="Input template",
        description="Template for the input data. "
        "Can use templating to refer to input parameters and perform data transformations.",
        examples=["Input: {input}", "Input: {input}\nLabels: {labels}\nOutput: "],
    )
    output_template: str = Field(
        title="Output template",
        description="Template for the output data. "
        "Can use templating to refer to input parameters and perform data transformations",
        examples=["Output: {output}", "{predictions}"],
    )
    description: Optional[str] = Field(
        default="",
        title="Skill description",
        description="Description of the skill. Can be used to retrieve skill from the library.",
        examples=["The skill to perform sentiment analysis on the input text."],
    )
    field_schema: Optional[Dict[str, Any]] = Field(
        default=None,
        title="Field schema",
        description="JSON schema for the fields of the input and output data.",
        examples=[
            {
                "input": {"type": "string"},
                "output": {"type": "string"},
                "labels": {
                    "type": "array",
                    "items": {
                        "type": "string",
                        "enum": ["positive", "negative", "neutral"],
                    },
                },
            }
        ],
    )
    instructions_first: bool = Field(
        default=True,
        title="Instructions first",
        description="Flag indicating if instructions should be shown before the input data.",
        examples=[True, False],
    )

    frozen: bool = Field(
        default=False,
        title="Frozen",
        description="Flag indicating if the skill is frozen.",
        examples=[True, False],
    )

    def _get_extra_fields(self):
        """
        Retrieves fields that are not categorized as system fields.

        Returns:
            dict: A dictionary containing fields that are not system fields.
        """

        # TODO: more robust way to exclude system fields
        system_fields = {
            "name",
            "description",
            "input_template",
            "output_template",
            "instructions",
            "field_schema",
        }
        extra_fields = self.model_dump(exclude=system_fields)
        return extra_fields

    def get_output_fields(self):
        """
        Retrieves output fields.

        Returns:
            List[str]: A list of output fields.
        """
        extra_fields = self._get_extra_fields()
        # TODO: input fields are not considered - shall we disallow input fields in output template?
        output_fields = parse_template(
            partial_str_format(self.output_template, **extra_fields),
            include_texts=False,
        )
        return [f["text"] for f in output_fields]

    @abstractmethod
    def apply(self, input, runtime):
        """
        Base method for applying the skill.
        """

    @abstractmethod
    def improve(self, predictions, train_skill_output, feedback, runtime):
        """
        Base method for improving the skill.
        """

apply(input, runtime) abstractmethod

Base method for applying the skill.

Source code in adala/skills/_base.py
133
134
135
136
137
@abstractmethod
def apply(self, input, runtime):
    """
    Base method for applying the skill.
    """

get_output_fields()

Retrieves output fields.

Returns:

Type Description

List[str]: A list of output fields.

Source code in adala/skills/_base.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_output_fields(self):
    """
    Retrieves output fields.

    Returns:
        List[str]: A list of output fields.
    """
    extra_fields = self._get_extra_fields()
    # TODO: input fields are not considered - shall we disallow input fields in output template?
    output_fields = parse_template(
        partial_str_format(self.output_template, **extra_fields),
        include_texts=False,
    )
    return [f["text"] for f in output_fields]

improve(predictions, train_skill_output, feedback, runtime) abstractmethod

Base method for improving the skill.

Source code in adala/skills/_base.py
139
140
141
142
143
@abstractmethod
def improve(self, predictions, train_skill_output, feedback, runtime):
    """
    Base method for improving the skill.
    """

SynthesisSkill

Bases: Skill

Synthesis skill that synthesize a dataframe from a record (e.g. for dataset generation purposes). See base class Skill for more information about the attributes.

Source code in adala/skills/_base.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
class SynthesisSkill(Skill):
    """
    Synthesis skill that synthesize a dataframe from a record (e.g. for dataset generation purposes).
    See base class Skill for more information about the attributes.
    """

    def apply(
        self,
        input: Union[Dict, InternalSeries],
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a record and returns a dataframe.

        Args:
            input (InternalSeries): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The synthesized data.
        """
        if isinstance(input, InternalSeries):
            input = input.to_dict()
        return runtime.record_to_batch(
            input,
            input_template=self.input_template,
            output_template=self.output_template,
            instructions_template=self.instructions,
            field_schema=self.field_schema,
            extra_fields=self._get_extra_fields(),
            instructions_first=self.instructions_first,
        )

    def improve(self, **kwargs):
        """
        Improves the skill.
        """
        raise NotImplementedError

apply(input, runtime)

Applies the skill to a record and returns a dataframe.

Parameters:

Name Type Description Default
input InternalSeries

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The synthesized data.

Source code in adala/skills/_base.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def apply(
    self,
    input: Union[Dict, InternalSeries],
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a record and returns a dataframe.

    Args:
        input (InternalSeries): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The synthesized data.
    """
    if isinstance(input, InternalSeries):
        input = input.to_dict()
    return runtime.record_to_batch(
        input,
        input_template=self.input_template,
        output_template=self.output_template,
        instructions_template=self.instructions,
        field_schema=self.field_schema,
        extra_fields=self._get_extra_fields(),
        instructions_first=self.instructions_first,
    )

improve(**kwargs)

Improves the skill.

Source code in adala/skills/_base.py
416
417
418
419
420
def improve(self, **kwargs):
    """
    Improves the skill.
    """
    raise NotImplementedError

TransformSkill

Bases: Skill

Transform skill that transforms a dataframe to another dataframe (e.g. for data annotation purposes). See base class Skill for more information about the attributes.

Source code in adala/skills/_base.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class TransformSkill(Skill):
    """
    Transform skill that transforms a dataframe to another dataframe (e.g. for data annotation purposes).
    See base class Skill for more information about the attributes.
    """

    def apply(
        self,
        input: InternalDataFrame,
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns another dataframe.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The transformed data.
        """

        return runtime.batch_to_batch(
            input,
            input_template=self.input_template,
            output_template=self.output_template,
            instructions_template=self.instructions,
            field_schema=self.field_schema,
            extra_fields=self._get_extra_fields(),
            instructions_first=self.instructions_first,
        )

    async def aapply(
        self,
        input: InternalDataFrame,
        runtime: AsyncRuntime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns another dataframe.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The transformed data.
        """

        return await runtime.batch_to_batch(
            input,
            input_template=self.input_template,
            output_template=self.output_template,
            instructions_template=self.instructions,
            field_schema=self.field_schema,
            extra_fields=self._get_extra_fields(),
            instructions_first=self.instructions_first,
        )

    def improve(
        self,
        predictions: InternalDataFrame,
        train_skill_output: str,
        feedback,
        runtime: Runtime,
        add_cot: bool = False,
    ):
        """
        Improves the skill.

        Args:
            predictions (InternalDataFrame): The predictions made by the skill.
            train_skill_output (str): The name of the output field of the skill.
            feedback (InternalDataFrame): The feedback provided by the user.
            runtime (Runtime): The runtime instance to be used for processing (CURRENTLY SUPPORTS ONLY `OpenAIChatRuntime`).
            add_cot (bool): Flag indicating if the skill should be used the Chain-of-Thought strategy. Defaults to False.
        """
        if feedback.feedback[train_skill_output].isna().all():
            # No feedback left - nothing to improve
            return

        if feedback.match[train_skill_output].all():
            # all feedback is "correct" - nothing to improve
            return

        fb = feedback.feedback.rename(
            columns=lambda x: x + "__fb" if x in predictions.columns else x
        )
        analyzed_df = fb.merge(predictions, left_index=True, right_index=True)

        examples = []

        for i, row in enumerate(analyzed_df.to_dict(orient="records")):
            # if fb marked as NaN, skip
            if not row[f"{train_skill_output}__fb"]:
                continue
            examples.append(
                f"### Example #{i}\n\n"
                f"{self.input_template.format(**row)}\n\n"
                f"{self.output_template.format(**row)}\n\n"
                f'User feedback: {row[f"{train_skill_output}__fb"]}\n\n'
            )

        examples = "\n".join(examples)

        messages = [{"role": "system", "content": "You are a helpful assistant."}]

        # full template
        if self.instructions_first:
            full_template = f"""
{{prompt}}
{self.input_template}
{self.output_template}"""
        else:
            full_template = f"""
{self.input_template}
{{prompt}}
{self.output_template}"""

        messages += [
            {
                "role": "user",
                "content": f"""
A prompt is a text paragraph that outlines the expected actions and instructs the large language model (LLM) to \
generate a specific output. This prompt is concatenated with the input text, and the \
model then creates the required output.
This describes the full template how the prompt is concatenated with the input to produce the output:

```
{full_template}
```

Here:
- "{self.input_template}" is input template,
- "{{prompt}}" is the LLM prompt,
- "{self.output_template}" is the output template.

Model can produce erroneous output if a prompt is not well defined. \
In our collaboration, we’ll work together to refine a prompt. The process consists of two main steps:

## Step 1
I will provide you with the current prompt along with prediction examples. Each example contains the input text, the final prediction produced by the model, and the user feedback. \
User feedback indicates whether the model prediction is correct or not. \
Your task is to analyze the examples and user feedback, determining whether the \
existing instruction is describing the task reflected by these examples precisely, and suggests changes to the prompt to address the incorrect predictions.

## Step 2
Next, you will carefully review your reasoning in step 1, integrate the insights to refine the prompt, \
and provide me with the new prompt that improves the model’s performance.""",
            }
        ]

        messages += [
            {
                "role": "assistant",
                "content": "Sure, I’d be happy to help you with this prompt engineering problem. "
                "Please provide me with the current prompt and the examples with user feedback.",
            }
        ]

        messages += [
            {
                "role": "user",
                "content": f"""
## Current prompt
{self.instructions}

## Examples
{examples}

Summarize your analysis about incorrect predictions and suggest changes to the prompt.""",
            }
        ]
        reasoning = runtime.execute(messages)

        messages += [
            {"role": "assistant", "content": reasoning},
            {
                "role": "user",
                "content": f"""
Now please carefully review your reasoning in Step 1 and help with Step 2: refining the prompt.

## Current prompt
{self.instructions}

## Follow this guidance to refine the prompt:

1. The new prompt should should describe the task precisely, and address the points raised in the user feedback.

2. The new prompt should be similar to the current prompt, and only differ in the parts that address the issues you identified in Step 1.
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Pay attention to the original style."

3. Reply only with the new prompt. Do not include input and output templates in the prompt.
""",
            },
        ]

        if add_cot:
            cot_instructions = """

4. In the new prompt, you should ask the model to perform step-by-step reasoning, and provide rationale or explanations for its prediction before giving the final answer. \
Instruct the model to give the final answer at the end of the prompt, using the following template: "Final answer: <answer>".
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Explain your reasoning step-by-step. Use the following template to give the final answer at the end of the prompt: "Final answer: <answer>"."""
            messages[-1]["content"] += cot_instructions
        # display dialogue:
        for message in messages:
            print(f'"{{{message["role"]}}}":\n{message["content"]}')
        new_prompt = runtime.execute(messages)
        self.instructions = new_prompt

aapply(input, runtime) async

Applies the skill to a dataframe and returns another dataframe.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The transformed data.

Source code in adala/skills/_base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def aapply(
    self,
    input: InternalDataFrame,
    runtime: AsyncRuntime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns another dataframe.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The transformed data.
    """

    return await runtime.batch_to_batch(
        input,
        input_template=self.input_template,
        output_template=self.output_template,
        instructions_template=self.instructions,
        field_schema=self.field_schema,
        extra_fields=self._get_extra_fields(),
        instructions_first=self.instructions_first,
    )

apply(input, runtime)

Applies the skill to a dataframe and returns another dataframe.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The transformed data.

Source code in adala/skills/_base.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def apply(
    self,
    input: InternalDataFrame,
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns another dataframe.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The transformed data.
    """

    return runtime.batch_to_batch(
        input,
        input_template=self.input_template,
        output_template=self.output_template,
        instructions_template=self.instructions,
        field_schema=self.field_schema,
        extra_fields=self._get_extra_fields(),
        instructions_first=self.instructions_first,
    )

improve(predictions, train_skill_output, feedback, runtime, add_cot=False)

Improves the skill.

Parameters:

Name Type Description Default
predictions InternalDataFrame

The predictions made by the skill.

required
train_skill_output str

The name of the output field of the skill.

required
feedback InternalDataFrame

The feedback provided by the user.

required
runtime Runtime

The runtime instance to be used for processing (CURRENTLY SUPPORTS ONLY OpenAIChatRuntime).

required
add_cot bool

Flag indicating if the skill should be used the Chain-of-Thought strategy. Defaults to False.

False
Source code in adala/skills/_base.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
    def improve(
        self,
        predictions: InternalDataFrame,
        train_skill_output: str,
        feedback,
        runtime: Runtime,
        add_cot: bool = False,
    ):
        """
        Improves the skill.

        Args:
            predictions (InternalDataFrame): The predictions made by the skill.
            train_skill_output (str): The name of the output field of the skill.
            feedback (InternalDataFrame): The feedback provided by the user.
            runtime (Runtime): The runtime instance to be used for processing (CURRENTLY SUPPORTS ONLY `OpenAIChatRuntime`).
            add_cot (bool): Flag indicating if the skill should be used the Chain-of-Thought strategy. Defaults to False.
        """
        if feedback.feedback[train_skill_output].isna().all():
            # No feedback left - nothing to improve
            return

        if feedback.match[train_skill_output].all():
            # all feedback is "correct" - nothing to improve
            return

        fb = feedback.feedback.rename(
            columns=lambda x: x + "__fb" if x in predictions.columns else x
        )
        analyzed_df = fb.merge(predictions, left_index=True, right_index=True)

        examples = []

        for i, row in enumerate(analyzed_df.to_dict(orient="records")):
            # if fb marked as NaN, skip
            if not row[f"{train_skill_output}__fb"]:
                continue
            examples.append(
                f"### Example #{i}\n\n"
                f"{self.input_template.format(**row)}\n\n"
                f"{self.output_template.format(**row)}\n\n"
                f'User feedback: {row[f"{train_skill_output}__fb"]}\n\n'
            )

        examples = "\n".join(examples)

        messages = [{"role": "system", "content": "You are a helpful assistant."}]

        # full template
        if self.instructions_first:
            full_template = f"""
{{prompt}}
{self.input_template}
{self.output_template}"""
        else:
            full_template = f"""
{self.input_template}
{{prompt}}
{self.output_template}"""

        messages += [
            {
                "role": "user",
                "content": f"""
A prompt is a text paragraph that outlines the expected actions and instructs the large language model (LLM) to \
generate a specific output. This prompt is concatenated with the input text, and the \
model then creates the required output.
This describes the full template how the prompt is concatenated with the input to produce the output:

```
{full_template}
```

Here:
- "{self.input_template}" is input template,
- "{{prompt}}" is the LLM prompt,
- "{self.output_template}" is the output template.

Model can produce erroneous output if a prompt is not well defined. \
In our collaboration, we’ll work together to refine a prompt. The process consists of two main steps:

## Step 1
I will provide you with the current prompt along with prediction examples. Each example contains the input text, the final prediction produced by the model, and the user feedback. \
User feedback indicates whether the model prediction is correct or not. \
Your task is to analyze the examples and user feedback, determining whether the \
existing instruction is describing the task reflected by these examples precisely, and suggests changes to the prompt to address the incorrect predictions.

## Step 2
Next, you will carefully review your reasoning in step 1, integrate the insights to refine the prompt, \
and provide me with the new prompt that improves the model’s performance.""",
            }
        ]

        messages += [
            {
                "role": "assistant",
                "content": "Sure, I’d be happy to help you with this prompt engineering problem. "
                "Please provide me with the current prompt and the examples with user feedback.",
            }
        ]

        messages += [
            {
                "role": "user",
                "content": f"""
## Current prompt
{self.instructions}

## Examples
{examples}

Summarize your analysis about incorrect predictions and suggest changes to the prompt.""",
            }
        ]
        reasoning = runtime.execute(messages)

        messages += [
            {"role": "assistant", "content": reasoning},
            {
                "role": "user",
                "content": f"""
Now please carefully review your reasoning in Step 1 and help with Step 2: refining the prompt.

## Current prompt
{self.instructions}

## Follow this guidance to refine the prompt:

1. The new prompt should should describe the task precisely, and address the points raised in the user feedback.

2. The new prompt should be similar to the current prompt, and only differ in the parts that address the issues you identified in Step 1.
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Pay attention to the original style."

3. Reply only with the new prompt. Do not include input and output templates in the prompt.
""",
            },
        ]

        if add_cot:
            cot_instructions = """

4. In the new prompt, you should ask the model to perform step-by-step reasoning, and provide rationale or explanations for its prediction before giving the final answer. \
Instruct the model to give the final answer at the end of the prompt, using the following template: "Final answer: <answer>".
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Explain your reasoning step-by-step. Use the following template to give the final answer at the end of the prompt: "Final answer: <answer>"."""
            messages[-1]["content"] += cot_instructions
        # display dialogue:
        for message in messages:
            print(f'"{{{message["role"]}}}":\n{message["content"]}')
        new_prompt = runtime.execute(messages)
        self.instructions = new_prompt

LinearSkillSet

Bases: SkillSet

Represents a sequence of skills that are acquired in a specific order to achieve a goal.

LinearSkillSet ensures that skills are applied in a sequential manner.

Attributes:

Name Type Description
skills Union[List[Skill], Dict[str, Skill]]

Provided skills

skill_sequence List[str]

Ordered list of skill names indicating the order in which they should be acquired.

Examples:

Create a LinearSkillSet with a list of skills specified as BaseSkill instances:
>>> from adala.skills import LinearSkillSet, TransformSkill, AnalysisSkill, ClassificationSkill
>>> skillset = LinearSkillSet(skills=[TransformSkill(), ClassificationSkill(), AnalysisSkill()])
Source code in adala/skills/skillset.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class LinearSkillSet(SkillSet):
    """
    Represents a sequence of skills that are acquired in a specific order to achieve a goal.

    LinearSkillSet ensures that skills are applied in a sequential manner.

    Attributes:
        skills (Union[List[Skill], Dict[str, Skill]]): Provided skills
        skill_sequence (List[str], optional): Ordered list of skill names indicating the order
                                              in which they should be acquired.

    Examples:

        Create a LinearSkillSet with a list of skills specified as BaseSkill instances:
        >>> from adala.skills import LinearSkillSet, TransformSkill, AnalysisSkill, ClassificationSkill
        >>> skillset = LinearSkillSet(skills=[TransformSkill(), ClassificationSkill(), AnalysisSkill()])
    """

    skill_sequence: List[str] = None

    @model_validator(mode="after")
    def skill_sequence_validator(self) -> "LinearSkillSet":
        """
        Validates and sets the default order for the skill sequence if not provided.

        Returns:
            LinearSkillSet: The current instance with updated skill_sequence attribute.
        """
        if self.skill_sequence is None:
            # use default skill sequence defined by lexicographical order
            self.skill_sequence = list(self.skills.keys())
        if len(self.skill_sequence) != len(self.skills):
            raise ValueError(
                f"skill_sequence must contain all skill names - "
                f"length of skill_sequence is {len(self.skill_sequence)} "
                f"while length of skills is {len(self.skills)}"
            )
        return self

    def apply(
        self,
        input: Union[Record, InternalDataFrame],
        runtime: Runtime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Sequentially applies each skill on the dataset.

        Args:
            input (InternalDataFrame): Input dataset.
            runtime (Runtime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
        Returns:
            InternalDataFrame: Skill predictions.
        """
        if improved_skill:
            # start from the specified skill, assuming previous skills have already been applied
            skill_sequence = self.skill_sequence[
                self.skill_sequence.index(improved_skill) :
            ]
        else:
            skill_sequence = self.skill_sequence
        skill_input = input
        for i, skill_name in enumerate(skill_sequence):
            skill = self.skills[skill_name]
            # use input dataset for the first node in the pipeline
            print_text(f"Applying skill: {skill_name}")
            skill_output = skill.apply(skill_input, runtime)
            print_dataframe(skill_output)
            if isinstance(skill, TransformSkill):
                # Columns to drop from skill_input because they are also in skill_output
                cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
                skill_input_reduced = skill_input.drop(columns=cols_to_drop)

                skill_input = skill_input_reduced.merge(
                    skill_output, left_index=True, right_index=True, how="inner"
                )
            elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
                skill_input = skill_output
            else:
                raise ValueError(f"Unsupported skill type: {type(skill)}")
        if isinstance(skill_input, InternalSeries):
            skill_input = skill_input.to_frame().T
        return skill_input

    async def aapply(
        self,
        input: Union[Record, InternalDataFrame],
        runtime: AsyncRuntime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Sequentially and asynchronously applies each skill on the dataset.

        Args:
            input (InternalDataFrame): Input dataset.
            runtime (AsyncRuntime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
        Returns:
            InternalDataFrame: Skill predictions.
        """
        if improved_skill:
            # start from the specified skill, assuming previous skills have already been applied
            skill_sequence = self.skill_sequence[
                self.skill_sequence.index(improved_skill) :
            ]
        else:
            skill_sequence = self.skill_sequence
        skill_input = input
        for i, skill_name in enumerate(skill_sequence):
            skill = self.skills[skill_name]
            # use input dataset for the first node in the pipeline
            print_text(f"Applying skill: {skill_name}")
            skill_output = await skill.aapply(skill_input, runtime)
            print_dataframe(skill_output)
            if isinstance(skill, TransformSkill):
                # Columns to drop from skill_input because they are also in skill_output
                cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
                skill_input_reduced = skill_input.drop(columns=cols_to_drop)

                skill_input = skill_input_reduced.merge(
                    skill_output, left_index=True, right_index=True, how="inner"
                )
            elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
                skill_input = skill_output
            else:
                raise ValueError(f"Unsupported skill type: {type(skill)}")
        if isinstance(skill_input, InternalSeries):
            skill_input = skill_input.to_frame().T
        return skill_input

    def __rich__(self):
        """Returns a rich representation of the skill."""
        # TODO: move it to a base class and use repr derived from Skills
        text = f"[bold blue]Total Agent Skills: {len(self.skills)}[/bold blue]\n\n"
        for skill in self.skills.values():
            text += (
                f"[bold underline green]{skill.name}[/bold underline green]\n"
                f"[green]{skill.instructions}[green]\n"
            )
        return text

__rich__()

Returns a rich representation of the skill.

Source code in adala/skills/skillset.py
267
268
269
270
271
272
273
274
275
276
def __rich__(self):
    """Returns a rich representation of the skill."""
    # TODO: move it to a base class and use repr derived from Skills
    text = f"[bold blue]Total Agent Skills: {len(self.skills)}[/bold blue]\n\n"
    for skill in self.skills.values():
        text += (
            f"[bold underline green]{skill.name}[/bold underline green]\n"
            f"[green]{skill.instructions}[green]\n"
        )
    return text

aapply(input, runtime, improved_skill=None) async

Sequentially and asynchronously applies each skill on the dataset.

Parameters:

Name Type Description Default
input InternalDataFrame

Input dataset.

required
runtime AsyncRuntime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Name of the skill to improve. Defaults to None.

None

Returns: InternalDataFrame: Skill predictions.

Source code in adala/skills/skillset.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def aapply(
    self,
    input: Union[Record, InternalDataFrame],
    runtime: AsyncRuntime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Sequentially and asynchronously applies each skill on the dataset.

    Args:
        input (InternalDataFrame): Input dataset.
        runtime (AsyncRuntime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
    Returns:
        InternalDataFrame: Skill predictions.
    """
    if improved_skill:
        # start from the specified skill, assuming previous skills have already been applied
        skill_sequence = self.skill_sequence[
            self.skill_sequence.index(improved_skill) :
        ]
    else:
        skill_sequence = self.skill_sequence
    skill_input = input
    for i, skill_name in enumerate(skill_sequence):
        skill = self.skills[skill_name]
        # use input dataset for the first node in the pipeline
        print_text(f"Applying skill: {skill_name}")
        skill_output = await skill.aapply(skill_input, runtime)
        print_dataframe(skill_output)
        if isinstance(skill, TransformSkill):
            # Columns to drop from skill_input because they are also in skill_output
            cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
            skill_input_reduced = skill_input.drop(columns=cols_to_drop)

            skill_input = skill_input_reduced.merge(
                skill_output, left_index=True, right_index=True, how="inner"
            )
        elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
            skill_input = skill_output
        else:
            raise ValueError(f"Unsupported skill type: {type(skill)}")
    if isinstance(skill_input, InternalSeries):
        skill_input = skill_input.to_frame().T
    return skill_input

apply(input, runtime, improved_skill=None)

Sequentially applies each skill on the dataset.

Parameters:

Name Type Description Default
input InternalDataFrame

Input dataset.

required
runtime Runtime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Name of the skill to improve. Defaults to None.

None

Returns: InternalDataFrame: Skill predictions.

Source code in adala/skills/skillset.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def apply(
    self,
    input: Union[Record, InternalDataFrame],
    runtime: Runtime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Sequentially applies each skill on the dataset.

    Args:
        input (InternalDataFrame): Input dataset.
        runtime (Runtime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
    Returns:
        InternalDataFrame: Skill predictions.
    """
    if improved_skill:
        # start from the specified skill, assuming previous skills have already been applied
        skill_sequence = self.skill_sequence[
            self.skill_sequence.index(improved_skill) :
        ]
    else:
        skill_sequence = self.skill_sequence
    skill_input = input
    for i, skill_name in enumerate(skill_sequence):
        skill = self.skills[skill_name]
        # use input dataset for the first node in the pipeline
        print_text(f"Applying skill: {skill_name}")
        skill_output = skill.apply(skill_input, runtime)
        print_dataframe(skill_output)
        if isinstance(skill, TransformSkill):
            # Columns to drop from skill_input because they are also in skill_output
            cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
            skill_input_reduced = skill_input.drop(columns=cols_to_drop)

            skill_input = skill_input_reduced.merge(
                skill_output, left_index=True, right_index=True, how="inner"
            )
        elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
            skill_input = skill_output
        else:
            raise ValueError(f"Unsupported skill type: {type(skill)}")
    if isinstance(skill_input, InternalSeries):
        skill_input = skill_input.to_frame().T
    return skill_input

skill_sequence_validator()

Validates and sets the default order for the skill sequence if not provided.

Returns:

Name Type Description
LinearSkillSet LinearSkillSet

The current instance with updated skill_sequence attribute.

Source code in adala/skills/skillset.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@model_validator(mode="after")
def skill_sequence_validator(self) -> "LinearSkillSet":
    """
    Validates and sets the default order for the skill sequence if not provided.

    Returns:
        LinearSkillSet: The current instance with updated skill_sequence attribute.
    """
    if self.skill_sequence is None:
        # use default skill sequence defined by lexicographical order
        self.skill_sequence = list(self.skills.keys())
    if len(self.skill_sequence) != len(self.skills):
        raise ValueError(
            f"skill_sequence must contain all skill names - "
            f"length of skill_sequence is {len(self.skill_sequence)} "
            f"while length of skills is {len(self.skills)}"
        )
    return self

ParallelSkillSet

Bases: SkillSet

Represents a set of skills that are acquired simultaneously to reach a goal.

In a ParallelSkillSet, each skill can be developed independently of the others. This is useful for agents that require multiple, diverse capabilities, or tasks where each skill contributes a piece of the overall solution.

Examples:

Create a ParallelSkillSet with a list of skills specified as BaseSkill instances

>>> from adala.skills import ParallelSkillSet, ClassificationSkill, TransformSkill
>>> skillset = ParallelSkillSet(skills=[ClassificationSkill(), TransformSkill()])
Source code in adala/skills/skillset.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class ParallelSkillSet(SkillSet):
    """
    Represents a set of skills that are acquired simultaneously to reach a goal.

    In a ParallelSkillSet, each skill can be developed independently of the others. This is useful
    for agents that require multiple, diverse capabilities, or tasks where each skill contributes a piece of
    the overall solution.

    Examples:
        Create a ParallelSkillSet with a list of skills specified as BaseSkill instances
        >>> from adala.skills import ParallelSkillSet, ClassificationSkill, TransformSkill
        >>> skillset = ParallelSkillSet(skills=[ClassificationSkill(), TransformSkill()])
    """

    def apply(
        self,
        input: Union[InternalSeries, InternalDataFrame],
        runtime: Runtime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Applies each skill on the dataset, enhancing the agent's experience.

        Args:
            input (Union[Record, InternalDataFrame]): Input data
            runtime (Runtime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Unused in ParallelSkillSet. Defaults to None.
        Returns:
            Union[Record, InternalDataFrame]: Skill predictions.
        """
        if improved_skill:
            # start from the specified skill, assuming previous skills have already been applied
            skill_sequence = [improved_skill]
        else:
            skill_sequence = list(self.skills.keys())

        skill_outputs = []
        for i, skill_name in enumerate(skill_sequence):
            skill = self.skills[skill_name]
            # use input dataset for the first node in the pipeline
            print_text(f"Applying skill: {skill_name}")
            skill_output = skill.apply(input, runtime)
            skill_outputs.append(skill_output)
        if not skill_outputs:
            return InternalDataFrame()
        else:
            if isinstance(skill_outputs[0], InternalDataFrame):
                skill_outputs = InternalDataFrameConcat(skill_outputs, axis=1)
                cols_to_drop = set(input.columns) & set(skill_outputs.columns)
                skill_input_reduced = input.drop(columns=cols_to_drop)

                return skill_input_reduced.merge(
                    skill_outputs, left_index=True, right_index=True, how="inner"
                )
            elif isinstance(skill_outputs[0], (dict, InternalSeries)):
                # concatenate output to each row of input
                output = skill_outputs[0]
                return InternalDataFrameConcat(
                    [
                        input,
                        InternalDataFrame(
                            [output] * len(input),
                            columns=output.index,
                            index=input.index,
                        ),
                    ],
                    axis=1,
                )
            else:
                raise ValueError(f"Unsupported output type: {type(skill_outputs[0])}")

apply(input, runtime, improved_skill=None)

Applies each skill on the dataset, enhancing the agent's experience.

Parameters:

Name Type Description Default
input Union[Record, InternalDataFrame]

Input data

required
runtime Runtime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Unused in ParallelSkillSet. Defaults to None.

None

Returns: Union[Record, InternalDataFrame]: Skill predictions.

Source code in adala/skills/skillset.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def apply(
    self,
    input: Union[InternalSeries, InternalDataFrame],
    runtime: Runtime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Applies each skill on the dataset, enhancing the agent's experience.

    Args:
        input (Union[Record, InternalDataFrame]): Input data
        runtime (Runtime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Unused in ParallelSkillSet. Defaults to None.
    Returns:
        Union[Record, InternalDataFrame]: Skill predictions.
    """
    if improved_skill:
        # start from the specified skill, assuming previous skills have already been applied
        skill_sequence = [improved_skill]
    else:
        skill_sequence = list(self.skills.keys())

    skill_outputs = []
    for i, skill_name in enumerate(skill_sequence):
        skill = self.skills[skill_name]
        # use input dataset for the first node in the pipeline
        print_text(f"Applying skill: {skill_name}")
        skill_output = skill.apply(input, runtime)
        skill_outputs.append(skill_output)
    if not skill_outputs:
        return InternalDataFrame()
    else:
        if isinstance(skill_outputs[0], InternalDataFrame):
            skill_outputs = InternalDataFrameConcat(skill_outputs, axis=1)
            cols_to_drop = set(input.columns) & set(skill_outputs.columns)
            skill_input_reduced = input.drop(columns=cols_to_drop)

            return skill_input_reduced.merge(
                skill_outputs, left_index=True, right_index=True, how="inner"
            )
        elif isinstance(skill_outputs[0], (dict, InternalSeries)):
            # concatenate output to each row of input
            output = skill_outputs[0]
            return InternalDataFrameConcat(
                [
                    input,
                    InternalDataFrame(
                        [output] * len(input),
                        columns=output.index,
                        index=input.index,
                    ),
                ],
                axis=1,
            )
        else:
            raise ValueError(f"Unsupported output type: {type(skill_outputs[0])}")

SkillSet

Bases: BaseModel, ABC

Represents a collection of interdependent skills aiming to achieve a specific goal.

A skill set breaks down the path to achieve a goal into necessary precursor skills. Agents can evolve these skills either in parallel for tasks like self-consistency or sequentially for complex problem decompositions and causal reasoning. In the most generic cases, task decomposition can involve a graph-based approach.

Attributes:

Name Type Description
skills Dict[str, Skill]

A dictionary of skills in the skill set.

Source code in adala/skills/skillset.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class SkillSet(BaseModel, ABC):
    """
    Represents a collection of interdependent skills aiming to achieve a specific goal.

    A skill set breaks down the path to achieve a goal into necessary precursor skills.
    Agents can evolve these skills either in parallel for tasks like self-consistency or
    sequentially for complex problem decompositions and causal reasoning. In the most generic
    cases, task decomposition can involve a graph-based approach.

    Attributes:
        skills (Dict[str, Skill]): A dictionary of skills in the skill set.
    """

    skills: Union[List, Dict[str, Skill]]

    @field_validator("skills", mode="before")
    def skills_validator(cls, v: Union[List, Dict]) -> Dict[str, Skill]:
        """
        Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances.

        Args:
            v (Union[List[Skill], Dict[str, Skill]]): The skills attribute to validate and convert.

        Returns:
            Dict[str, BaseSkill]: Dictionary mapping skill names to their corresponding BaseSkill instances.
        """
        skills = OrderedDict()
        if not v:
            return skills

        elif isinstance(v, list):
            if isinstance(v[0], Skill):
                # convert list of skill names to dictionary
                for skill in v:
                    skills[skill.name] = skill
            elif isinstance(v[0], dict):
                # convert list of skill dictionaries to dictionary
                for skill in v:
                    if "type" not in skill:
                        raise ValueError("Skill dictionary must contain a 'type' key")
                    skills[skill["name"]] = Skill.create_from_registry(
                        skill.pop("type"), **skill
                    )
        elif isinstance(v, dict):
            skills = v
        else:
            raise ValueError(
                f"skills must be a list or dictionary, but received type {type(v)}"
            )
        return skills

    @abstractmethod
    def apply(
        self,
        input: Union[Record, InternalDataFrame],
        runtime: Runtime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Apply the skill set to a dataset using a specified runtime.

        Args:
            input (Union[Record, InternalDataFrame]): Input data to apply the skill set to.
            runtime (Runtime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Name of the skill to start from (to optimize calculations). Defaults to None.
        Returns:
            InternalDataFrame: Skill predictions.
        """

    def __getitem__(self, skill_name) -> Skill:
        """
        Select skill by name.

        Args:
            skill_name (str): Name of the skill to select.

        Returns:
            BaseSkill: Skill
        """
        return self.skills[skill_name]

    def __setitem__(self, skill_name, skill: Skill):
        """
        Set skill by name.

        Args:
            skill_name (str): Name of the skill to set.
            skill (BaseSkill): Skill to set.
        """
        self.skills[skill_name] = skill

    def get_skill_names(self) -> List[str]:
        """
        Get list of skill names.

        Returns:
            List[str]: List of skill names.
        """
        return list(self.skills.keys())

    def get_skill_outputs(self) -> Dict[str, str]:
        """
        Get dictionary of skill outputs.

        Returns:
            Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names
        """
        return {
            field: skill.name
            for skill in self.skills.values()
            for field in skill.get_output_fields()
        }

__getitem__(skill_name)

Select skill by name.

Parameters:

Name Type Description Default
skill_name str

Name of the skill to select.

required

Returns:

Name Type Description
BaseSkill Skill

Skill

Source code in adala/skills/skillset.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __getitem__(self, skill_name) -> Skill:
    """
    Select skill by name.

    Args:
        skill_name (str): Name of the skill to select.

    Returns:
        BaseSkill: Skill
    """
    return self.skills[skill_name]

__setitem__(skill_name, skill)

Set skill by name.

Parameters:

Name Type Description Default
skill_name str

Name of the skill to set.

required
skill BaseSkill

Skill to set.

required
Source code in adala/skills/skillset.py
103
104
105
106
107
108
109
110
111
def __setitem__(self, skill_name, skill: Skill):
    """
    Set skill by name.

    Args:
        skill_name (str): Name of the skill to set.
        skill (BaseSkill): Skill to set.
    """
    self.skills[skill_name] = skill

apply(input, runtime, improved_skill=None) abstractmethod

Apply the skill set to a dataset using a specified runtime.

Parameters:

Name Type Description Default
input Union[Record, InternalDataFrame]

Input data to apply the skill set to.

required
runtime Runtime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Name of the skill to start from (to optimize calculations). Defaults to None.

None

Returns: InternalDataFrame: Skill predictions.

Source code in adala/skills/skillset.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@abstractmethod
def apply(
    self,
    input: Union[Record, InternalDataFrame],
    runtime: Runtime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Apply the skill set to a dataset using a specified runtime.

    Args:
        input (Union[Record, InternalDataFrame]): Input data to apply the skill set to.
        runtime (Runtime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Name of the skill to start from (to optimize calculations). Defaults to None.
    Returns:
        InternalDataFrame: Skill predictions.
    """

get_skill_names()

Get list of skill names.

Returns:

Type Description
List[str]

List[str]: List of skill names.

Source code in adala/skills/skillset.py
113
114
115
116
117
118
119
120
def get_skill_names(self) -> List[str]:
    """
    Get list of skill names.

    Returns:
        List[str]: List of skill names.
    """
    return list(self.skills.keys())

get_skill_outputs()

Get dictionary of skill outputs.

Returns:

Type Description
Dict[str, str]

Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names

Source code in adala/skills/skillset.py
122
123
124
125
126
127
128
129
130
131
132
133
def get_skill_outputs(self) -> Dict[str, str]:
    """
    Get dictionary of skill outputs.

    Returns:
        Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names
    """
    return {
        field: skill.name
        for skill in self.skills.values()
        for field in skill.get_output_fields()
    }

skills_validator(v)

Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances.

Parameters:

Name Type Description Default
v Union[List[Skill], Dict[str, Skill]]

The skills attribute to validate and convert.

required

Returns:

Type Description
Dict[str, Skill]

Dict[str, BaseSkill]: Dictionary mapping skill names to their corresponding BaseSkill instances.

Source code in adala/skills/skillset.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@field_validator("skills", mode="before")
def skills_validator(cls, v: Union[List, Dict]) -> Dict[str, Skill]:
    """
    Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances.

    Args:
        v (Union[List[Skill], Dict[str, Skill]]): The skills attribute to validate and convert.

    Returns:
        Dict[str, BaseSkill]: Dictionary mapping skill names to their corresponding BaseSkill instances.
    """
    skills = OrderedDict()
    if not v:
        return skills

    elif isinstance(v, list):
        if isinstance(v[0], Skill):
            # convert list of skill names to dictionary
            for skill in v:
                skills[skill.name] = skill
        elif isinstance(v[0], dict):
            # convert list of skill dictionaries to dictionary
            for skill in v:
                if "type" not in skill:
                    raise ValueError("Skill dictionary must contain a 'type' key")
                skills[skill["name"]] = Skill.create_from_registry(
                    skill.pop("type"), **skill
                )
    elif isinstance(v, dict):
        skills = v
    else:
        raise ValueError(
            f"skills must be a list or dictionary, but received type {type(v)}"
        )
    return skills