Skip to content

Skills

AnalysisSkill

Bases: Skill

Analysis skill that analyzes a dataframe and returns a record (e.g. for data analysis purposes). See base class Skill for more information about the attributes.

Source code in adala/skills/_base.py
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
class AnalysisSkill(Skill):
    """
    Analysis skill that analyzes a dataframe and returns a record (e.g. for data analysis purposes).
    See base class Skill for more information about the attributes.
    """

    input_prefix: str = ""
    input_separator: str = "\n"
    chunk_size: Optional[int] = None

    def _iter_over_chunks(
        self, input: InternalDataFrame, chunk_size: Optional[int] = None
    ):
        """
        Iterates over chunks of the input dataframe.
        Returns a generator of strings that are the concatenation of the rows of the chunk with `input_separator`
        interpolated with the `input_template` and `extra_fields`.
        """

        if input.empty:
            yield ""
            return

        if isinstance(input, InternalSeries):
            input = input.to_frame()
        elif isinstance(input, dict):
            input = InternalDataFrame([input])

        extra_fields = self._get_extra_fields()

        # if chunk_size is specified, split the input into chunks and process each chunk separately
        if self.chunk_size is not None:
            chunks = (
                input.iloc[i : i + self.chunk_size]
                for i in range(0, len(input), self.chunk_size)
            )
        else:
            chunks = [input]

        # define the row preprocessing function
        def row_preprocessing(row):
            return partial_str_format(
                self.input_template, **row, **extra_fields, i=int(row.name) + 1
            )

        total = input.shape[0] // self.chunk_size if self.chunk_size is not None else 1
        for chunk in tqdm(chunks, desc="Processing chunks", total=total):
            # interpolate every row with input_template and concatenate them with input_separator to produce a single string
            agg_chunk = (
                chunk.reset_index()
                .apply(row_preprocessing, axis=1)
                .str.cat(sep=self.input_separator)
            )
            yield agg_chunk

    def apply(
        self,
        input: Union[InternalDataFrame, InternalSeries, Dict],
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns a record.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalSeries: The record containing the analysis results.
        """
        outputs = []
        for agg_chunk in self._iter_over_chunks(input):
            output = runtime.record_to_record(
                {"input": f"{self.input_prefix}{agg_chunk}"},
                input_template="{input}",
                output_template=self.output_template,
                instructions_template=self.instructions,
                instructions_first=self.instructions_first,
                response_model=self.response_model,
            )
            outputs.append(InternalSeries(output))
        output = InternalDataFrame(outputs)

        return output

    async def aapply(
        self,
        input: Union[InternalDataFrame, InternalSeries, Dict],
        runtime: AsyncRuntime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns a record.
        """
        outputs = []
        for agg_chunk in self._iter_over_chunks(input):
            output = await runtime.record_to_record(
                {"input": f"{self.input_prefix}{agg_chunk}"},
                input_template="{input}",
                output_template=self.output_template,
                instructions_template=self.instructions,
                instructions_first=self.instructions_first,
                response_model=self.response_model,
            )
            outputs.append(InternalSeries(output))
        output = InternalDataFrame(outputs)

        return output

    def improve(self, **kwargs):
        """
        Improves the skill.
        """
        raise NotImplementedError

aapply(input, runtime) async

Applies the skill to a dataframe and returns a record.

Source code in adala/skills/_base.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
async def aapply(
    self,
    input: Union[InternalDataFrame, InternalSeries, Dict],
    runtime: AsyncRuntime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns a record.
    """
    outputs = []
    for agg_chunk in self._iter_over_chunks(input):
        output = await runtime.record_to_record(
            {"input": f"{self.input_prefix}{agg_chunk}"},
            input_template="{input}",
            output_template=self.output_template,
            instructions_template=self.instructions,
            instructions_first=self.instructions_first,
            response_model=self.response_model,
        )
        outputs.append(InternalSeries(output))
    output = InternalDataFrame(outputs)

    return output

apply(input, runtime)

Applies the skill to a dataframe and returns a record.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalSeries InternalDataFrame

The record containing the analysis results.

Source code in adala/skills/_base.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def apply(
    self,
    input: Union[InternalDataFrame, InternalSeries, Dict],
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns a record.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalSeries: The record containing the analysis results.
    """
    outputs = []
    for agg_chunk in self._iter_over_chunks(input):
        output = runtime.record_to_record(
            {"input": f"{self.input_prefix}{agg_chunk}"},
            input_template="{input}",
            output_template=self.output_template,
            instructions_template=self.instructions,
            instructions_first=self.instructions_first,
            response_model=self.response_model,
        )
        outputs.append(InternalSeries(output))
    output = InternalDataFrame(outputs)

    return output

improve(**kwargs)

Improves the skill.

Source code in adala/skills/_base.py
713
714
715
716
717
def improve(self, **kwargs):
    """
    Improves the skill.
    """
    raise NotImplementedError

SampleTransformSkill

Bases: TransformSkill

Source code in adala/skills/_base.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
class SampleTransformSkill(TransformSkill):
    sample_size: int

    def apply(
        self,
        input: InternalDataFrame,
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns a dataframe.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The processed data.
        """
        return super(SampleTransformSkill, self).apply(
            input.sample(self.sample_size), runtime
        )

apply(input, runtime)

Applies the skill to a dataframe and returns a dataframe.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The processed data.

Source code in adala/skills/_base.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def apply(
    self,
    input: InternalDataFrame,
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns a dataframe.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The processed data.
    """
    return super(SampleTransformSkill, self).apply(
        input.sample(self.sample_size), runtime
    )

Skill

Bases: BaseModelInRegistry

Abstract base class representing a skill.

Provides methods to interact with and obtain information about skills.

Attributes:

Name Type Description
name str

Unique name of the skill.

instructions str

Instructs agent what to do with the input data.

input_template str

Template for the input data.

output_template str

Template for the output data.

description Optional[str]

Description of the skill.

field_schema Optional[Dict]

Field JSON schema to use in the templates. Defaults to all fields are strings, i.e. analogous to {"field_n": {"type": "string"}}.

extra_fields Optional[Dict[str, str]]

Extra fields to use in the templates. Defaults to None.

instructions_first bool

Flag indicating if instructions should be executed before input. Defaults to True.

verbose bool

Flag indicating if runtime outputs should be verbose. Defaults to False.

frozen bool

Flag indicating if the skill is frozen. Defaults to False.

response_model Optional[Type[BaseModel]]

Pydantic-based response model for the skill. If used, output_template and field_schema are ignored. Note that using response_model will become the default in the future.

type ClassVar[str]

Type of the skill.

Source code in adala/skills/_base.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class Skill(BaseModelInRegistry):
    """
    Abstract base class representing a skill.

    Provides methods to interact with and obtain information about skills.

    Attributes:
        name (str): Unique name of the skill.
        instructions (str): Instructs agent what to do with the input data.
        input_template (str): Template for the input data.
        output_template (str): Template for the output data.
        description (Optional[str]): Description of the skill.
        field_schema (Optional[Dict]): Field [JSON schema](https://json-schema.org/) to use in the templates. Defaults to all fields are strings,
            i.e. analogous to {"field_n": {"type": "string"}}.
        extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None.
        instructions_first (bool): Flag indicating if instructions should be executed before input. Defaults to True.
        verbose (bool): Flag indicating if runtime outputs should be verbose. Defaults to False.
        frozen (bool): Flag indicating if the skill is frozen. Defaults to False.
        response_model (Optional[Type[BaseModel]]): Pydantic-based response model for the skill. If used, `output_template` and `field_schema` are ignored. Note that using `response_model` will become the default in the future.
        type (ClassVar[str]): Type of the skill.
    """

    name: str = Field(
        title="Skill name",
        description="Unique name of the skill",
        examples=["labeling", "classification", "text-generation"],
    )
    instructions: str = Field(
        title="Skill instructions",
        description="Instructs agent what to do with the input data. "
        "Can use templating to refer to input fields.",
        examples=["Label the input text with the following labels: {labels}"],
        # TODO: instructions can be deprecated in favor of using `input_template` to specify the instructions
        default="",
    )
    input_template: str = Field(
        title="Input template",
        description="Template for the input data. "
        "Can use templating to refer to input parameters and perform data transformations.",
        examples=["Input: {input}", "Input: {input}\nLabels: {labels}\nOutput: "],
    )
    output_template: str = Field(
        title="Output template",
        description="Template for the output data. "
        "Can use templating to refer to input parameters and perform data transformations",
        examples=["Output: {output}", "{predictions}"],
        # TODO: output_template can be deprecated in favor of using `response_model` to specify the output
        default="",
    )
    description: Optional[str] = Field(
        default="",
        title="Skill description",
        description="Description of the skill. Can be used to retrieve skill from the library.",
        examples=["The skill to perform sentiment analysis on the input text."],
    )
    field_schema: Optional[Dict[str, Any]] = Field(
        default=None,
        title="Field schema",
        description="JSON schema for the fields of the input and output data.",
        examples=[
            {
                "input": {"type": "string"},
                "output": {"type": "string"},
                "labels": {
                    "type": "array",
                    "items": {
                        "type": "string",
                        "enum": ["positive", "negative", "neutral"],
                    },
                },
            }
        ],
    )
    instructions_first: bool = Field(
        default=True,
        title="Instructions first",
        description="Flag indicating if instructions should be shown before the input data.",
        examples=[True, False],
    )

    frozen: bool = Field(
        default=False,
        title="Frozen",
        description="Flag indicating if the skill is frozen.",
        examples=[True, False],
    )

    response_model: Type[BaseModel] = Field(
        default=None,
        title="Response model",
        description="Pydantic-based response model for the skill. If used, `output_template` and `field_schema` are ignored.",
    )

    def _get_extra_fields(self):
        """
        Retrieves fields that are not categorized as system fields.

        Returns:
            dict: A dictionary containing fields that are not system fields.
        """

        # TODO: more robust way to exclude system fields
        system_fields = {
            "name",
            "description",
            "input_template",
            "output_template",
            "instructions",
            "field_schema",
            "extra_fields",
            "instructions_first",
            "verbose",
            "frozen",
            "response_model",
            "type",
        }
        extra_fields = self.model_dump(exclude=system_fields)
        return extra_fields

    def get_input_fields(self):
        """
        Retrieves input fields.

        Returns:
            List[str]: A list of input fields.
        """
        extra_fields = self._get_extra_fields()
        input_fields = parse_template(
            partial_str_format(self.input_template, **extra_fields),
            include_texts=False,
        )
        return [f["text"] for f in input_fields]

    def get_output_fields(self):
        """
        Retrieves output fields.

        Returns:
            List[str]: A list of output fields.
        """
        if self.response_model:
            return list(self.response_model.__fields__.keys())
        if self.field_schema:
            return list(self.field_schema.keys())

        extra_fields = self._get_extra_fields()
        # TODO: input fields are not considered - shall we disallow input fields in output template?
        output_fields = parse_template(
            partial_str_format(self.output_template, **extra_fields),
            include_texts=False,
        )
        return [f["text"] for f in output_fields]

    def _create_response_model_from_field_schema(self):
        assert self.field_schema, "field_schema is required to create a response model"
        if self.response_model:
            return
        self.response_model = field_schema_to_pydantic_class(
            self.field_schema, self.name, self.description
        )

    @model_validator(mode="after")
    def validate_response_model(self):
        if self.response_model:
            # if response_model, we use it right away
            return self

        if not self.field_schema:
            # if field_schema is not provided, extract it from `output_template`
            logger.info(
                f"Parsing output_template to generate the response model: {self.output_template}"
            )
            self.field_schema = {}
            chunks = parse_template(self.output_template)

            previous_text = ""
            for chunk in chunks:
                if chunk["type"] == "text":
                    previous_text = chunk["text"]
                if chunk["type"] == "var":
                    field_name = chunk["text"]
                    # by default, all fields are strings
                    field_type = "string"

                    # if description is not provided, use the text before the field,
                    # otherwise use the field name with underscores replaced by spaces
                    field_description = previous_text or field_name.replace("_", " ")
                    field_description = field_description.strip(
                        string.punctuation
                    ).strip()
                    previous_text = ""

                    # create default JSON schema entry for the field
                    self.field_schema[field_name] = {
                        "type": field_type,
                        "description": field_description,
                    }

        self._create_response_model_from_field_schema()
        return self

    # When serializing the agent, ensure `response_model` is excluded.
    # It will be restored from `field_schema` during deserialization.
    @field_serializer("response_model")
    def serialize_response_model(self, value):
        return None

    # remove `response_model` from the pickle serialization
    def __getstate__(self):
        state = super().__getstate__()
        state["__dict__"]["response_model"] = None
        return state

    def __setstate__(self, state):
        super().__setstate__(state)
        # ensure response_model is restored from field_schema, if not already set
        self._create_response_model_from_field_schema()

    @abstractmethod
    def apply(self, input, runtime):
        """
        Base method for applying the skill.
        """

    @abstractmethod
    def improve(self, predictions, train_skill_output, feedback, runtime):
        """
        Base method for improving the skill.
        """

apply(input, runtime) abstractmethod

Base method for applying the skill.

Source code in adala/skills/_base.py
246
247
248
249
250
@abstractmethod
def apply(self, input, runtime):
    """
    Base method for applying the skill.
    """

get_input_fields()

Retrieves input fields.

Returns:

Type Description

List[str]: A list of input fields.

Source code in adala/skills/_base.py
147
148
149
150
151
152
153
154
155
156
157
158
159
def get_input_fields(self):
    """
    Retrieves input fields.

    Returns:
        List[str]: A list of input fields.
    """
    extra_fields = self._get_extra_fields()
    input_fields = parse_template(
        partial_str_format(self.input_template, **extra_fields),
        include_texts=False,
    )
    return [f["text"] for f in input_fields]

get_output_fields()

Retrieves output fields.

Returns:

Type Description

List[str]: A list of output fields.

Source code in adala/skills/_base.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def get_output_fields(self):
    """
    Retrieves output fields.

    Returns:
        List[str]: A list of output fields.
    """
    if self.response_model:
        return list(self.response_model.__fields__.keys())
    if self.field_schema:
        return list(self.field_schema.keys())

    extra_fields = self._get_extra_fields()
    # TODO: input fields are not considered - shall we disallow input fields in output template?
    output_fields = parse_template(
        partial_str_format(self.output_template, **extra_fields),
        include_texts=False,
    )
    return [f["text"] for f in output_fields]

improve(predictions, train_skill_output, feedback, runtime) abstractmethod

Base method for improving the skill.

Source code in adala/skills/_base.py
252
253
254
255
256
@abstractmethod
def improve(self, predictions, train_skill_output, feedback, runtime):
    """
    Base method for improving the skill.
    """

SynthesisSkill

Bases: Skill

Synthesis skill that synthesize a dataframe from a record (e.g. for dataset generation purposes). See base class Skill for more information about the attributes.

Source code in adala/skills/_base.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
class SynthesisSkill(Skill):
    """
    Synthesis skill that synthesize a dataframe from a record (e.g. for dataset generation purposes).
    See base class Skill for more information about the attributes.
    """

    def apply(
        self,
        input: Union[Dict, InternalSeries],
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a record and returns a dataframe.

        Args:
            input (InternalSeries): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The synthesized data.
        """
        if isinstance(input, InternalSeries):
            input = input.to_dict()
        return runtime.record_to_batch(
            input,
            input_template=self.input_template,
            output_template=self.output_template,
            instructions_template=self.instructions,
            field_schema=self.field_schema,
            extra_fields=self._get_extra_fields(),
            instructions_first=self.instructions_first,
            response_model=self.response_model,
        )

    def improve(self, **kwargs):
        """
        Improves the skill.
        """
        raise NotImplementedError

apply(input, runtime)

Applies the skill to a record and returns a dataframe.

Parameters:

Name Type Description Default
input InternalSeries

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The synthesized data.

Source code in adala/skills/_base.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
def apply(
    self,
    input: Union[Dict, InternalSeries],
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a record and returns a dataframe.

    Args:
        input (InternalSeries): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The synthesized data.
    """
    if isinstance(input, InternalSeries):
        input = input.to_dict()
    return runtime.record_to_batch(
        input,
        input_template=self.input_template,
        output_template=self.output_template,
        instructions_template=self.instructions,
        field_schema=self.field_schema,
        extra_fields=self._get_extra_fields(),
        instructions_first=self.instructions_first,
        response_model=self.response_model,
    )

improve(**kwargs)

Improves the skill.

Source code in adala/skills/_base.py
598
599
600
601
602
def improve(self, **kwargs):
    """
    Improves the skill.
    """
    raise NotImplementedError

TransformSkill

Bases: Skill

Transform skill that transforms a dataframe to another dataframe (e.g. for data annotation purposes). See base class Skill for more information about the attributes.

Source code in adala/skills/_base.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
class TransformSkill(Skill):
    """
    Transform skill that transforms a dataframe to another dataframe (e.g. for data annotation purposes).
    See base class Skill for more information about the attributes.
    """

    def apply(
        self,
        input: InternalDataFrame,
        runtime: Runtime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns another dataframe.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The transformed data.
        """

        return runtime.batch_to_batch(
            input,
            input_template=self.input_template,
            output_template=self.output_template,
            instructions_template=self.instructions,
            field_schema=self.field_schema,
            extra_fields=self._get_extra_fields(),
            instructions_first=self.instructions_first,
            response_model=self.response_model,
        )

    async def aapply(
        self,
        input: InternalDataFrame,
        runtime: AsyncRuntime,
    ) -> InternalDataFrame:
        """
        Applies the skill to a dataframe and returns another dataframe.

        Args:
            input (InternalDataFrame): The input data to be processed.
            runtime (Runtime): The runtime instance to be used for processing.

        Returns:
            InternalDataFrame: The transformed data.
        """

        return await runtime.batch_to_batch(
            input,
            input_template=self.input_template,
            output_template=self.output_template,
            instructions_template=self.instructions,
            field_schema=self.field_schema,
            extra_fields=self._get_extra_fields(),
            instructions_first=self.instructions_first,
            response_model=self.response_model,
        )

    def improve(
        self,
        predictions: InternalDataFrame,
        train_skill_output: str,
        feedback,
        runtime: Runtime,
        add_cot: bool = False,
    ):
        """
        Improves the skill.

        Args:
            predictions (InternalDataFrame): The predictions made by the skill.
            train_skill_output (str): The name of the output field of the skill.
            feedback (InternalDataFrame): The feedback provided by the user.
            runtime (Runtime): The runtime instance to be used for processing (CURRENTLY SUPPORTS ONLY `OpenAIChatRuntime`).
            add_cot (bool): Flag indicating if the skill should be used the Chain-of-Thought strategy. Defaults to False.
        """
        if feedback.feedback[train_skill_output].isna().all():
            # No feedback left - nothing to improve
            return

        if feedback.match[train_skill_output].all():
            # all feedback is "correct" - nothing to improve
            return

        fb = feedback.feedback.rename(
            columns=lambda x: x + "__fb" if x in predictions.columns else x
        )
        analyzed_df = fb.merge(predictions, left_index=True, right_index=True)

        examples = []

        for i, row in enumerate(analyzed_df.to_dict(orient="records")):
            # if fb marked as NaN, skip
            if not row[f"{train_skill_output}__fb"]:
                continue

            # TODO: self.output_template can be missed or incompatible with the field_schema
            # we need to redefine how we create examples for learn()
            if not self.output_template:
                raise ValueError(
                    "`output_template` is required for improve() method and must contain "
                    "the output fields from `field_schema`"
                )
            examples.append(
                f"### Example #{i}\n\n"
                f"{partial_str_format(self.input_template, **row)}\n\n"
                f"{partial_str_format(self.output_template, **row)}\n\n"
                f'User feedback: {row[f"{train_skill_output}__fb"]}\n\n'
            )

        examples = "\n".join(examples)

        messages = [{"role": "system", "content": "You are a helpful assistant."}]

        # full template
        if self.instructions_first:
            full_template = f"""
{{prompt}}
{self.input_template}
{self.output_template}"""
        else:
            full_template = f"""
{self.input_template}
{{prompt}}
{self.output_template}"""

        messages += [
            {
                "role": "user",
                "content": f"""
A prompt is a text paragraph that outlines the expected actions and instructs the large language model (LLM) to \
generate a specific output. This prompt is concatenated with the input text, and the \
model then creates the required output.
This describes the full template how the prompt is concatenated with the input to produce the output:

```
{full_template}
```

Here:
- "{self.input_template}" is input template,
- "{{prompt}}" is the LLM prompt,
- "{self.output_template}" is the output template.

Model can produce erroneous output if a prompt is not well defined. \
In our collaboration, we’ll work together to refine a prompt. The process consists of two main steps:

## Step 1
I will provide you with the current prompt along with prediction examples. Each example contains the input text, the final prediction produced by the model, and the user feedback. \
User feedback indicates whether the model prediction is correct or not. \
Your task is to analyze the examples and user feedback, determining whether the \
existing instruction is describing the task reflected by these examples precisely, and suggests changes to the prompt to address the incorrect predictions.

## Step 2
Next, you will carefully review your reasoning in step 1, integrate the insights to refine the prompt, \
and provide me with the new prompt that improves the model’s performance.""",
            }
        ]

        messages += [
            {
                "role": "assistant",
                "content": "Sure, I’d be happy to help you with this prompt engineering problem. "
                "Please provide me with the current prompt and the examples with user feedback.",
            }
        ]

        messages += [
            {
                "role": "user",
                "content": f"""
## Current prompt
{self.instructions}

## Examples
{examples}

Summarize your analysis about incorrect predictions and suggest changes to the prompt.""",
            }
        ]
        reasoning = runtime.get_llm_response(messages)

        messages += [
            {"role": "assistant", "content": reasoning},
            {
                "role": "user",
                "content": f"""
Now please carefully review your reasoning in Step 1 and help with Step 2: refining the prompt.

## Current prompt
{self.instructions}

## Follow this guidance to refine the prompt:

1. The new prompt should should describe the task precisely, and address the points raised in the user feedback.

2. The new prompt should be similar to the current prompt, and only differ in the parts that address the issues you identified in Step 1.
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Pay attention to the original style."

3. Reply only with the new prompt. Do not include input and output templates in the prompt.
""",
            },
        ]

        if add_cot:
            cot_instructions = """

4. In the new prompt, you should ask the model to perform step-by-step reasoning, and provide rationale or explanations for its prediction before giving the final answer. \
Instruct the model to give the final answer at the end of the prompt, using the following template: "Final answer: <answer>".
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Explain your reasoning step-by-step. Use the following template to give the final answer at the end of the prompt: "Final answer: <answer>"."""
            messages[-1]["content"] += cot_instructions
        # display dialogue:
        for message in messages:
            print(f'"{{{message["role"]}}}":\n{message["content"]}')
        new_prompt = runtime.get_llm_response(messages)
        self.instructions = new_prompt

    async def aimprove(
        self,
        teacher_runtime: AsyncRuntime,
        target_input_variables: List[str],
        predictions: Optional[InternalDataFrame] = None,
        instructions: Optional[str] = None,
    ):
        """
        Improves the skill.
        """

        from adala.skills.collection.prompt_improvement import (
            PromptImprovementSkill,
            ImprovedPromptResponse,
            ErrorResponseModel,
            PromptImprovementSkillResponseModel,
        )

        response_dct = {}
        try:
            prompt_improvement_skill = PromptImprovementSkill(
                skill_to_improve=self,
                input_variables=target_input_variables,
                instructions=instructions,
            )
            if predictions is None:
                input_df = InternalDataFrame()
            else:
                input_df = predictions
            response_df = await prompt_improvement_skill.aapply(
                input=input_df,
                runtime=teacher_runtime,
            )

            # awkward to go from response model -> dict -> df -> dict -> response model
            response_dct = response_df.iloc[0].to_dict()

            # unflatten the response
            if response_dct.pop("_adala_error", False):
                output = ErrorResponseModel(**response_dct)
            else:
                output = PromptImprovementSkillResponseModel(**response_dct)

        except Exception as e:
            logger.error(
                f"Error improving skill: {e}. Traceback: {traceback.format_exc()}"
            )
            output = ErrorResponseModel(
                _adala_message=str(e),
                _adala_details=traceback.format_exc(),
            )

        # get tokens and token cost
        resp = ImprovedPromptResponse(output=output, **response_dct)
        logger.debug(f"resp: {resp}")

        return resp

aapply(input, runtime) async

Applies the skill to a dataframe and returns another dataframe.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The transformed data.

Source code in adala/skills/_base.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def aapply(
    self,
    input: InternalDataFrame,
    runtime: AsyncRuntime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns another dataframe.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The transformed data.
    """

    return await runtime.batch_to_batch(
        input,
        input_template=self.input_template,
        output_template=self.output_template,
        instructions_template=self.instructions,
        field_schema=self.field_schema,
        extra_fields=self._get_extra_fields(),
        instructions_first=self.instructions_first,
        response_model=self.response_model,
    )

aimprove(teacher_runtime, target_input_variables, predictions=None, instructions=None) async

Improves the skill.

Source code in adala/skills/_base.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
async def aimprove(
    self,
    teacher_runtime: AsyncRuntime,
    target_input_variables: List[str],
    predictions: Optional[InternalDataFrame] = None,
    instructions: Optional[str] = None,
):
    """
    Improves the skill.
    """

    from adala.skills.collection.prompt_improvement import (
        PromptImprovementSkill,
        ImprovedPromptResponse,
        ErrorResponseModel,
        PromptImprovementSkillResponseModel,
    )

    response_dct = {}
    try:
        prompt_improvement_skill = PromptImprovementSkill(
            skill_to_improve=self,
            input_variables=target_input_variables,
            instructions=instructions,
        )
        if predictions is None:
            input_df = InternalDataFrame()
        else:
            input_df = predictions
        response_df = await prompt_improvement_skill.aapply(
            input=input_df,
            runtime=teacher_runtime,
        )

        # awkward to go from response model -> dict -> df -> dict -> response model
        response_dct = response_df.iloc[0].to_dict()

        # unflatten the response
        if response_dct.pop("_adala_error", False):
            output = ErrorResponseModel(**response_dct)
        else:
            output = PromptImprovementSkillResponseModel(**response_dct)

    except Exception as e:
        logger.error(
            f"Error improving skill: {e}. Traceback: {traceback.format_exc()}"
        )
        output = ErrorResponseModel(
            _adala_message=str(e),
            _adala_details=traceback.format_exc(),
        )

    # get tokens and token cost
    resp = ImprovedPromptResponse(output=output, **response_dct)
    logger.debug(f"resp: {resp}")

    return resp

apply(input, runtime)

Applies the skill to a dataframe and returns another dataframe.

Parameters:

Name Type Description Default
input InternalDataFrame

The input data to be processed.

required
runtime Runtime

The runtime instance to be used for processing.

required

Returns:

Name Type Description
InternalDataFrame InternalDataFrame

The transformed data.

Source code in adala/skills/_base.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def apply(
    self,
    input: InternalDataFrame,
    runtime: Runtime,
) -> InternalDataFrame:
    """
    Applies the skill to a dataframe and returns another dataframe.

    Args:
        input (InternalDataFrame): The input data to be processed.
        runtime (Runtime): The runtime instance to be used for processing.

    Returns:
        InternalDataFrame: The transformed data.
    """

    return runtime.batch_to_batch(
        input,
        input_template=self.input_template,
        output_template=self.output_template,
        instructions_template=self.instructions,
        field_schema=self.field_schema,
        extra_fields=self._get_extra_fields(),
        instructions_first=self.instructions_first,
        response_model=self.response_model,
    )

improve(predictions, train_skill_output, feedback, runtime, add_cot=False)

Improves the skill.

Parameters:

Name Type Description Default
predictions InternalDataFrame

The predictions made by the skill.

required
train_skill_output str

The name of the output field of the skill.

required
feedback InternalDataFrame

The feedback provided by the user.

required
runtime Runtime

The runtime instance to be used for processing (CURRENTLY SUPPORTS ONLY OpenAIChatRuntime).

required
add_cot bool

Flag indicating if the skill should be used the Chain-of-Thought strategy. Defaults to False.

False
Source code in adala/skills/_base.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
    def improve(
        self,
        predictions: InternalDataFrame,
        train_skill_output: str,
        feedback,
        runtime: Runtime,
        add_cot: bool = False,
    ):
        """
        Improves the skill.

        Args:
            predictions (InternalDataFrame): The predictions made by the skill.
            train_skill_output (str): The name of the output field of the skill.
            feedback (InternalDataFrame): The feedback provided by the user.
            runtime (Runtime): The runtime instance to be used for processing (CURRENTLY SUPPORTS ONLY `OpenAIChatRuntime`).
            add_cot (bool): Flag indicating if the skill should be used the Chain-of-Thought strategy. Defaults to False.
        """
        if feedback.feedback[train_skill_output].isna().all():
            # No feedback left - nothing to improve
            return

        if feedback.match[train_skill_output].all():
            # all feedback is "correct" - nothing to improve
            return

        fb = feedback.feedback.rename(
            columns=lambda x: x + "__fb" if x in predictions.columns else x
        )
        analyzed_df = fb.merge(predictions, left_index=True, right_index=True)

        examples = []

        for i, row in enumerate(analyzed_df.to_dict(orient="records")):
            # if fb marked as NaN, skip
            if not row[f"{train_skill_output}__fb"]:
                continue

            # TODO: self.output_template can be missed or incompatible with the field_schema
            # we need to redefine how we create examples for learn()
            if not self.output_template:
                raise ValueError(
                    "`output_template` is required for improve() method and must contain "
                    "the output fields from `field_schema`"
                )
            examples.append(
                f"### Example #{i}\n\n"
                f"{partial_str_format(self.input_template, **row)}\n\n"
                f"{partial_str_format(self.output_template, **row)}\n\n"
                f'User feedback: {row[f"{train_skill_output}__fb"]}\n\n'
            )

        examples = "\n".join(examples)

        messages = [{"role": "system", "content": "You are a helpful assistant."}]

        # full template
        if self.instructions_first:
            full_template = f"""
{{prompt}}
{self.input_template}
{self.output_template}"""
        else:
            full_template = f"""
{self.input_template}
{{prompt}}
{self.output_template}"""

        messages += [
            {
                "role": "user",
                "content": f"""
A prompt is a text paragraph that outlines the expected actions and instructs the large language model (LLM) to \
generate a specific output. This prompt is concatenated with the input text, and the \
model then creates the required output.
This describes the full template how the prompt is concatenated with the input to produce the output:

```
{full_template}
```

Here:
- "{self.input_template}" is input template,
- "{{prompt}}" is the LLM prompt,
- "{self.output_template}" is the output template.

Model can produce erroneous output if a prompt is not well defined. \
In our collaboration, we’ll work together to refine a prompt. The process consists of two main steps:

## Step 1
I will provide you with the current prompt along with prediction examples. Each example contains the input text, the final prediction produced by the model, and the user feedback. \
User feedback indicates whether the model prediction is correct or not. \
Your task is to analyze the examples and user feedback, determining whether the \
existing instruction is describing the task reflected by these examples precisely, and suggests changes to the prompt to address the incorrect predictions.

## Step 2
Next, you will carefully review your reasoning in step 1, integrate the insights to refine the prompt, \
and provide me with the new prompt that improves the model’s performance.""",
            }
        ]

        messages += [
            {
                "role": "assistant",
                "content": "Sure, I’d be happy to help you with this prompt engineering problem. "
                "Please provide me with the current prompt and the examples with user feedback.",
            }
        ]

        messages += [
            {
                "role": "user",
                "content": f"""
## Current prompt
{self.instructions}

## Examples
{examples}

Summarize your analysis about incorrect predictions and suggest changes to the prompt.""",
            }
        ]
        reasoning = runtime.get_llm_response(messages)

        messages += [
            {"role": "assistant", "content": reasoning},
            {
                "role": "user",
                "content": f"""
Now please carefully review your reasoning in Step 1 and help with Step 2: refining the prompt.

## Current prompt
{self.instructions}

## Follow this guidance to refine the prompt:

1. The new prompt should should describe the task precisely, and address the points raised in the user feedback.

2. The new prompt should be similar to the current prompt, and only differ in the parts that address the issues you identified in Step 1.
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Pay attention to the original style."

3. Reply only with the new prompt. Do not include input and output templates in the prompt.
""",
            },
        ]

        if add_cot:
            cot_instructions = """

4. In the new prompt, you should ask the model to perform step-by-step reasoning, and provide rationale or explanations for its prediction before giving the final answer. \
Instruct the model to give the final answer at the end of the prompt, using the following template: "Final answer: <answer>".
    Example:
    - Current prompt: "Generate a summary of the input text."
    - New prompt: "Generate a summary of the input text. Explain your reasoning step-by-step. Use the following template to give the final answer at the end of the prompt: "Final answer: <answer>"."""
            messages[-1]["content"] += cot_instructions
        # display dialogue:
        for message in messages:
            print(f'"{{{message["role"]}}}":\n{message["content"]}')
        new_prompt = runtime.get_llm_response(messages)
        self.instructions = new_prompt

LinearSkillSet

Bases: SkillSet

Represents a sequence of skills that are acquired in a specific order to achieve a goal.

LinearSkillSet ensures that skills are applied in a sequential manner.

Attributes:

Name Type Description
skills Union[List[Skill], Dict[str, Skill]]

Provided skills

skill_sequence List[str]

Ordered list of skill names indicating the order in which they should be acquired.

Examples:

Create a LinearSkillSet with a list of skills specified as BaseSkill instances:
>>> from adala.skills import LinearSkillSet, TransformSkill, AnalysisSkill, ClassificationSkill
>>> skillset = LinearSkillSet(skills=[TransformSkill(), ClassificationSkill(), AnalysisSkill()])
Source code in adala/skills/skillset.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class LinearSkillSet(SkillSet):
    """
    Represents a sequence of skills that are acquired in a specific order to achieve a goal.

    LinearSkillSet ensures that skills are applied in a sequential manner.

    Attributes:
        skills (Union[List[Skill], Dict[str, Skill]]): Provided skills
        skill_sequence (List[str], optional): Ordered list of skill names indicating the order
                                              in which they should be acquired.

    Examples:

        Create a LinearSkillSet with a list of skills specified as BaseSkill instances:
        >>> from adala.skills import LinearSkillSet, TransformSkill, AnalysisSkill, ClassificationSkill
        >>> skillset = LinearSkillSet(skills=[TransformSkill(), ClassificationSkill(), AnalysisSkill()])
    """

    skill_sequence: List[str] = None

    @model_validator(mode="after")
    def skill_sequence_validator(self) -> "LinearSkillSet":
        """
        Validates and sets the default order for the skill sequence if not provided.

        Returns:
            LinearSkillSet: The current instance with updated skill_sequence attribute.
        """
        if self.skill_sequence is None:
            # use default skill sequence defined by lexicographical order
            self.skill_sequence = list(self.skills.keys())
        if len(self.skill_sequence) != len(self.skills):
            raise ValueError(
                f"skill_sequence must contain all skill names - "
                f"length of skill_sequence is {len(self.skill_sequence)} "
                f"while length of skills is {len(self.skills)}"
            )
        return self

    def apply(
        self,
        input: Union[Record, InternalDataFrame],
        runtime: Runtime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Sequentially applies each skill on the dataset.

        Args:
            input (InternalDataFrame): Input dataset.
            runtime (Runtime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
        Returns:
            InternalDataFrame: Skill predictions.
        """
        if improved_skill:
            # start from the specified skill, assuming previous skills have already been applied
            skill_sequence = self.skill_sequence[
                self.skill_sequence.index(improved_skill) :
            ]
        else:
            skill_sequence = self.skill_sequence
        skill_input = input
        for i, skill_name in enumerate(skill_sequence):
            skill = self.skills[skill_name]
            # use input dataset for the first node in the pipeline
            print_text(f"Applying skill: {skill_name}")
            skill_output = skill.apply(skill_input, runtime)
            print_dataframe(skill_output)
            if isinstance(skill, TransformSkill):
                # Columns to drop from skill_input because they are also in skill_output
                cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
                skill_input_reduced = skill_input.drop(columns=cols_to_drop)

                skill_input = skill_input_reduced.merge(
                    skill_output, left_index=True, right_index=True, how="inner"
                )
            elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
                skill_input = skill_output
            else:
                raise ValueError(f"Unsupported skill type: {type(skill)}")
        if isinstance(skill_input, InternalSeries):
            skill_input = skill_input.to_frame().T
        return skill_input

    async def aapply(
        self,
        input: Union[Record, InternalDataFrame],
        runtime: AsyncRuntime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Sequentially and asynchronously applies each skill on the dataset.

        Args:
            input (InternalDataFrame): Input dataset.
            runtime (AsyncRuntime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
        Returns:
            InternalDataFrame: Skill predictions.
        """
        if improved_skill:
            # start from the specified skill, assuming previous skills have already been applied
            skill_sequence = self.skill_sequence[
                self.skill_sequence.index(improved_skill) :
            ]
        else:
            skill_sequence = self.skill_sequence
        skill_input = input
        for i, skill_name in enumerate(skill_sequence):
            skill = self.skills[skill_name]
            # use input dataset for the first node in the pipeline
            print_text(f"Applying skill: {skill_name}")
            skill_output = await skill.aapply(skill_input, runtime)
            print_dataframe(skill_output)
            if isinstance(skill, TransformSkill):
                # Columns to drop from skill_input because they are also in skill_output
                cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
                skill_input_reduced = skill_input.drop(columns=cols_to_drop)

                skill_input = skill_input_reduced.merge(
                    skill_output, left_index=True, right_index=True, how="inner"
                )
            elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
                skill_input = skill_output
            else:
                raise ValueError(f"Unsupported skill type: {type(skill)}")
        if isinstance(skill_input, InternalSeries):
            skill_input = skill_input.to_frame().T
        return skill_input

    def __rich__(self):
        """Returns a rich representation of the skill."""
        # TODO: move it to a base class and use repr derived from Skills
        text = f"[bold blue]Total Agent Skills: {len(self.skills)}[/bold blue]\n\n"
        for skill in self.skills.values():
            text += (
                f"[bold underline green]{skill.name}[/bold underline green]\n"
                f"[green]{skill.instructions}[green]\n"
            )
        return text

__rich__()

Returns a rich representation of the skill.

Source code in adala/skills/skillset.py
267
268
269
270
271
272
273
274
275
276
def __rich__(self):
    """Returns a rich representation of the skill."""
    # TODO: move it to a base class and use repr derived from Skills
    text = f"[bold blue]Total Agent Skills: {len(self.skills)}[/bold blue]\n\n"
    for skill in self.skills.values():
        text += (
            f"[bold underline green]{skill.name}[/bold underline green]\n"
            f"[green]{skill.instructions}[green]\n"
        )
    return text

aapply(input, runtime, improved_skill=None) async

Sequentially and asynchronously applies each skill on the dataset.

Parameters:

Name Type Description Default
input InternalDataFrame

Input dataset.

required
runtime AsyncRuntime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Name of the skill to improve. Defaults to None.

None

Returns: InternalDataFrame: Skill predictions.

Source code in adala/skills/skillset.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def aapply(
    self,
    input: Union[Record, InternalDataFrame],
    runtime: AsyncRuntime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Sequentially and asynchronously applies each skill on the dataset.

    Args:
        input (InternalDataFrame): Input dataset.
        runtime (AsyncRuntime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
    Returns:
        InternalDataFrame: Skill predictions.
    """
    if improved_skill:
        # start from the specified skill, assuming previous skills have already been applied
        skill_sequence = self.skill_sequence[
            self.skill_sequence.index(improved_skill) :
        ]
    else:
        skill_sequence = self.skill_sequence
    skill_input = input
    for i, skill_name in enumerate(skill_sequence):
        skill = self.skills[skill_name]
        # use input dataset for the first node in the pipeline
        print_text(f"Applying skill: {skill_name}")
        skill_output = await skill.aapply(skill_input, runtime)
        print_dataframe(skill_output)
        if isinstance(skill, TransformSkill):
            # Columns to drop from skill_input because they are also in skill_output
            cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
            skill_input_reduced = skill_input.drop(columns=cols_to_drop)

            skill_input = skill_input_reduced.merge(
                skill_output, left_index=True, right_index=True, how="inner"
            )
        elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
            skill_input = skill_output
        else:
            raise ValueError(f"Unsupported skill type: {type(skill)}")
    if isinstance(skill_input, InternalSeries):
        skill_input = skill_input.to_frame().T
    return skill_input

apply(input, runtime, improved_skill=None)

Sequentially applies each skill on the dataset.

Parameters:

Name Type Description Default
input InternalDataFrame

Input dataset.

required
runtime Runtime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Name of the skill to improve. Defaults to None.

None

Returns: InternalDataFrame: Skill predictions.

Source code in adala/skills/skillset.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def apply(
    self,
    input: Union[Record, InternalDataFrame],
    runtime: Runtime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Sequentially applies each skill on the dataset.

    Args:
        input (InternalDataFrame): Input dataset.
        runtime (Runtime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None.
    Returns:
        InternalDataFrame: Skill predictions.
    """
    if improved_skill:
        # start from the specified skill, assuming previous skills have already been applied
        skill_sequence = self.skill_sequence[
            self.skill_sequence.index(improved_skill) :
        ]
    else:
        skill_sequence = self.skill_sequence
    skill_input = input
    for i, skill_name in enumerate(skill_sequence):
        skill = self.skills[skill_name]
        # use input dataset for the first node in the pipeline
        print_text(f"Applying skill: {skill_name}")
        skill_output = skill.apply(skill_input, runtime)
        print_dataframe(skill_output)
        if isinstance(skill, TransformSkill):
            # Columns to drop from skill_input because they are also in skill_output
            cols_to_drop = set(skill_output.columns) & set(skill_input.columns)
            skill_input_reduced = skill_input.drop(columns=cols_to_drop)

            skill_input = skill_input_reduced.merge(
                skill_output, left_index=True, right_index=True, how="inner"
            )
        elif isinstance(skill, (AnalysisSkill, SynthesisSkill)):
            skill_input = skill_output
        else:
            raise ValueError(f"Unsupported skill type: {type(skill)}")
    if isinstance(skill_input, InternalSeries):
        skill_input = skill_input.to_frame().T
    return skill_input

skill_sequence_validator()

Validates and sets the default order for the skill sequence if not provided.

Returns:

Name Type Description
LinearSkillSet LinearSkillSet

The current instance with updated skill_sequence attribute.

Source code in adala/skills/skillset.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@model_validator(mode="after")
def skill_sequence_validator(self) -> "LinearSkillSet":
    """
    Validates and sets the default order for the skill sequence if not provided.

    Returns:
        LinearSkillSet: The current instance with updated skill_sequence attribute.
    """
    if self.skill_sequence is None:
        # use default skill sequence defined by lexicographical order
        self.skill_sequence = list(self.skills.keys())
    if len(self.skill_sequence) != len(self.skills):
        raise ValueError(
            f"skill_sequence must contain all skill names - "
            f"length of skill_sequence is {len(self.skill_sequence)} "
            f"while length of skills is {len(self.skills)}"
        )
    return self

ParallelSkillSet

Bases: SkillSet

Represents a set of skills that are acquired simultaneously to reach a goal.

In a ParallelSkillSet, each skill can be developed independently of the others. This is useful for agents that require multiple, diverse capabilities, or tasks where each skill contributes a piece of the overall solution.

Examples:

Create a ParallelSkillSet with a list of skills specified as BaseSkill instances

>>> from adala.skills import ParallelSkillSet, ClassificationSkill, TransformSkill
>>> skillset = ParallelSkillSet(skills=[ClassificationSkill(), TransformSkill()])
Source code in adala/skills/skillset.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class ParallelSkillSet(SkillSet):
    """
    Represents a set of skills that are acquired simultaneously to reach a goal.

    In a ParallelSkillSet, each skill can be developed independently of the others. This is useful
    for agents that require multiple, diverse capabilities, or tasks where each skill contributes a piece of
    the overall solution.

    Examples:
        Create a ParallelSkillSet with a list of skills specified as BaseSkill instances
        >>> from adala.skills import ParallelSkillSet, ClassificationSkill, TransformSkill
        >>> skillset = ParallelSkillSet(skills=[ClassificationSkill(), TransformSkill()])
    """

    def apply(
        self,
        input: Union[InternalSeries, InternalDataFrame],
        runtime: Runtime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Applies each skill on the dataset, enhancing the agent's experience.

        Args:
            input (Union[Record, InternalDataFrame]): Input data
            runtime (Runtime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Unused in ParallelSkillSet. Defaults to None.
        Returns:
            Union[Record, InternalDataFrame]: Skill predictions.
        """
        if improved_skill:
            # start from the specified skill, assuming previous skills have already been applied
            skill_sequence = [improved_skill]
        else:
            skill_sequence = list(self.skills.keys())

        skill_outputs = []
        for i, skill_name in enumerate(skill_sequence):
            skill = self.skills[skill_name]
            # use input dataset for the first node in the pipeline
            print_text(f"Applying skill: {skill_name}")
            skill_output = skill.apply(input, runtime)
            skill_outputs.append(skill_output)
        if not skill_outputs:
            return InternalDataFrame()
        else:
            if isinstance(skill_outputs[0], InternalDataFrame):
                skill_outputs = InternalDataFrameConcat(skill_outputs, axis=1)
                cols_to_drop = set(input.columns) & set(skill_outputs.columns)
                skill_input_reduced = input.drop(columns=cols_to_drop)

                return skill_input_reduced.merge(
                    skill_outputs, left_index=True, right_index=True, how="inner"
                )
            elif isinstance(skill_outputs[0], (dict, InternalSeries)):
                # concatenate output to each row of input
                output = skill_outputs[0]
                return InternalDataFrameConcat(
                    [
                        input,
                        InternalDataFrame(
                            [output] * len(input),
                            columns=output.index,
                            index=input.index,
                        ),
                    ],
                    axis=1,
                )
            else:
                raise ValueError(f"Unsupported output type: {type(skill_outputs[0])}")

apply(input, runtime, improved_skill=None)

Applies each skill on the dataset, enhancing the agent's experience.

Parameters:

Name Type Description Default
input Union[Record, InternalDataFrame]

Input data

required
runtime Runtime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Unused in ParallelSkillSet. Defaults to None.

None

Returns: Union[Record, InternalDataFrame]: Skill predictions.

Source code in adala/skills/skillset.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def apply(
    self,
    input: Union[InternalSeries, InternalDataFrame],
    runtime: Runtime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Applies each skill on the dataset, enhancing the agent's experience.

    Args:
        input (Union[Record, InternalDataFrame]): Input data
        runtime (Runtime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Unused in ParallelSkillSet. Defaults to None.
    Returns:
        Union[Record, InternalDataFrame]: Skill predictions.
    """
    if improved_skill:
        # start from the specified skill, assuming previous skills have already been applied
        skill_sequence = [improved_skill]
    else:
        skill_sequence = list(self.skills.keys())

    skill_outputs = []
    for i, skill_name in enumerate(skill_sequence):
        skill = self.skills[skill_name]
        # use input dataset for the first node in the pipeline
        print_text(f"Applying skill: {skill_name}")
        skill_output = skill.apply(input, runtime)
        skill_outputs.append(skill_output)
    if not skill_outputs:
        return InternalDataFrame()
    else:
        if isinstance(skill_outputs[0], InternalDataFrame):
            skill_outputs = InternalDataFrameConcat(skill_outputs, axis=1)
            cols_to_drop = set(input.columns) & set(skill_outputs.columns)
            skill_input_reduced = input.drop(columns=cols_to_drop)

            return skill_input_reduced.merge(
                skill_outputs, left_index=True, right_index=True, how="inner"
            )
        elif isinstance(skill_outputs[0], (dict, InternalSeries)):
            # concatenate output to each row of input
            output = skill_outputs[0]
            return InternalDataFrameConcat(
                [
                    input,
                    InternalDataFrame(
                        [output] * len(input),
                        columns=output.index,
                        index=input.index,
                    ),
                ],
                axis=1,
            )
        else:
            raise ValueError(f"Unsupported output type: {type(skill_outputs[0])}")

SkillSet

Bases: BaseModel, ABC

Represents a collection of interdependent skills aiming to achieve a specific goal.

A skill set breaks down the path to achieve a goal into necessary precursor skills. Agents can evolve these skills either in parallel for tasks like self-consistency or sequentially for complex problem decompositions and causal reasoning. In the most generic cases, task decomposition can involve a graph-based approach.

Attributes:

Name Type Description
skills Dict[str, Skill]

A dictionary of skills in the skill set.

Source code in adala/skills/skillset.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class SkillSet(BaseModel, ABC):
    """
    Represents a collection of interdependent skills aiming to achieve a specific goal.

    A skill set breaks down the path to achieve a goal into necessary precursor skills.
    Agents can evolve these skills either in parallel for tasks like self-consistency or
    sequentially for complex problem decompositions and causal reasoning. In the most generic
    cases, task decomposition can involve a graph-based approach.

    Attributes:
        skills (Dict[str, Skill]): A dictionary of skills in the skill set.
    """

    skills: Union[List, Dict[str, Skill]]

    @field_validator("skills", mode="before")
    def skills_validator(cls, v: Union[List, Dict]) -> Dict[str, Skill]:
        """
        Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances.

        Args:
            v (Union[List[Skill], Dict[str, Skill]]): The skills attribute to validate and convert.

        Returns:
            Dict[str, BaseSkill]: Dictionary mapping skill names to their corresponding BaseSkill instances.
        """
        skills = OrderedDict()
        if not v:
            return skills

        elif isinstance(v, list):
            if isinstance(v[0], Skill):
                # convert list of skill names to dictionary
                for skill in v:
                    skills[skill.name] = skill
            elif isinstance(v[0], dict):
                # convert list of skill dictionaries to dictionary
                for skill in v:
                    if "type" not in skill:
                        raise ValueError("Skill dictionary must contain a 'type' key")
                    skills[skill["name"]] = Skill.create_from_registry(
                        skill.pop("type"), **skill
                    )
        elif isinstance(v, dict):
            skills = v
        else:
            raise ValueError(
                f"skills must be a list or dictionary, but received type {type(v)}"
            )
        return skills

    @abstractmethod
    def apply(
        self,
        input: Union[Record, InternalDataFrame],
        runtime: Runtime,
        improved_skill: Optional[str] = None,
    ) -> InternalDataFrame:
        """
        Apply the skill set to a dataset using a specified runtime.

        Args:
            input (Union[Record, InternalDataFrame]): Input data to apply the skill set to.
            runtime (Runtime): The runtime environment in which to apply the skills.
            improved_skill (Optional[str], optional): Name of the skill to start from (to optimize calculations). Defaults to None.
        Returns:
            InternalDataFrame: Skill predictions.
        """

    def __getitem__(self, skill_name) -> Skill:
        """
        Select skill by name.

        Args:
            skill_name (str): Name of the skill to select.

        Returns:
            BaseSkill: Skill
        """
        return self.skills[skill_name]

    def __setitem__(self, skill_name, skill: Skill):
        """
        Set skill by name.

        Args:
            skill_name (str): Name of the skill to set.
            skill (BaseSkill): Skill to set.
        """
        self.skills[skill_name] = skill

    def get_skill_names(self) -> List[str]:
        """
        Get list of skill names.

        Returns:
            List[str]: List of skill names.
        """
        return list(self.skills.keys())

    def get_skill_outputs(self) -> Dict[str, str]:
        """
        Get dictionary of skill outputs.

        Returns:
            Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names
        """
        return {
            field: skill.name
            for skill in self.skills.values()
            for field in skill.get_output_fields()
        }

__getitem__(skill_name)

Select skill by name.

Parameters:

Name Type Description Default
skill_name str

Name of the skill to select.

required

Returns:

Name Type Description
BaseSkill Skill

Skill

Source code in adala/skills/skillset.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __getitem__(self, skill_name) -> Skill:
    """
    Select skill by name.

    Args:
        skill_name (str): Name of the skill to select.

    Returns:
        BaseSkill: Skill
    """
    return self.skills[skill_name]

__setitem__(skill_name, skill)

Set skill by name.

Parameters:

Name Type Description Default
skill_name str

Name of the skill to set.

required
skill BaseSkill

Skill to set.

required
Source code in adala/skills/skillset.py
103
104
105
106
107
108
109
110
111
def __setitem__(self, skill_name, skill: Skill):
    """
    Set skill by name.

    Args:
        skill_name (str): Name of the skill to set.
        skill (BaseSkill): Skill to set.
    """
    self.skills[skill_name] = skill

apply(input, runtime, improved_skill=None) abstractmethod

Apply the skill set to a dataset using a specified runtime.

Parameters:

Name Type Description Default
input Union[Record, InternalDataFrame]

Input data to apply the skill set to.

required
runtime Runtime

The runtime environment in which to apply the skills.

required
improved_skill Optional[str]

Name of the skill to start from (to optimize calculations). Defaults to None.

None

Returns: InternalDataFrame: Skill predictions.

Source code in adala/skills/skillset.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@abstractmethod
def apply(
    self,
    input: Union[Record, InternalDataFrame],
    runtime: Runtime,
    improved_skill: Optional[str] = None,
) -> InternalDataFrame:
    """
    Apply the skill set to a dataset using a specified runtime.

    Args:
        input (Union[Record, InternalDataFrame]): Input data to apply the skill set to.
        runtime (Runtime): The runtime environment in which to apply the skills.
        improved_skill (Optional[str], optional): Name of the skill to start from (to optimize calculations). Defaults to None.
    Returns:
        InternalDataFrame: Skill predictions.
    """

get_skill_names()

Get list of skill names.

Returns:

Type Description
List[str]

List[str]: List of skill names.

Source code in adala/skills/skillset.py
113
114
115
116
117
118
119
120
def get_skill_names(self) -> List[str]:
    """
    Get list of skill names.

    Returns:
        List[str]: List of skill names.
    """
    return list(self.skills.keys())

get_skill_outputs()

Get dictionary of skill outputs.

Returns:

Type Description
Dict[str, str]

Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names

Source code in adala/skills/skillset.py
122
123
124
125
126
127
128
129
130
131
132
133
def get_skill_outputs(self) -> Dict[str, str]:
    """
    Get dictionary of skill outputs.

    Returns:
        Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names
    """
    return {
        field: skill.name
        for skill in self.skills.values()
        for field in skill.get_output_fields()
    }

skills_validator(v)

Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances.

Parameters:

Name Type Description Default
v Union[List[Skill], Dict[str, Skill]]

The skills attribute to validate and convert.

required

Returns:

Type Description
Dict[str, Skill]

Dict[str, BaseSkill]: Dictionary mapping skill names to their corresponding BaseSkill instances.

Source code in adala/skills/skillset.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@field_validator("skills", mode="before")
def skills_validator(cls, v: Union[List, Dict]) -> Dict[str, Skill]:
    """
    Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances.

    Args:
        v (Union[List[Skill], Dict[str, Skill]]): The skills attribute to validate and convert.

    Returns:
        Dict[str, BaseSkill]: Dictionary mapping skill names to their corresponding BaseSkill instances.
    """
    skills = OrderedDict()
    if not v:
        return skills

    elif isinstance(v, list):
        if isinstance(v[0], Skill):
            # convert list of skill names to dictionary
            for skill in v:
                skills[skill.name] = skill
        elif isinstance(v[0], dict):
            # convert list of skill dictionaries to dictionary
            for skill in v:
                if "type" not in skill:
                    raise ValueError("Skill dictionary must contain a 'type' key")
                skills[skill["name"]] = Skill.create_from_registry(
                    skill.pop("type"), **skill
                )
    elif isinstance(v, dict):
        skills = v
    else:
        raise ValueError(
            f"skills must be a list or dictionary, but received type {type(v)}"
        )
    return skills