Skip to content

Audio Dashboard Table

airtable_apply_annotations.audio_dashboard_table

AudioDashboardTable

Bases: AirTableS3Integration

Source code in src/airtable_apply_annotations/audio_dashboard_table.py
class AudioDashboardTable(AirTableS3Integration):
    def __init__(self, airtable_url: str, filter_formula: str, headers: Dict[str, str]):
        """Constructor for the `AudioDashboardTable` class.

        Args:
            airtable_url (str): URL endpoint to AirTable table.
            filter_formula (str): Additional GET URL filter formula parameter.
            headers (Dict[str, str]): API Header containing authorization.
        """
        super().__init__(airtable_url, filter_formula, headers)

    def _apply_annotation_changes_s3(self, record: Dict[str, Any]):
        """Applies changes in an S3 directory based on an AirTable `record`'s category verdict.

        Args:
            record (Dict[str, Any]): An AirTable record/row.
        """
        fields = record["fields"]

        job_name, language = fields["Job Name"], fields["Language"]
        category, transcript = fields["Category"].lower(), fields["Transcript"]
        audio_filename = fields["Audio"][0]["filename"]

        source_path = f"categorisation/raw/{language}"
        save_path = f"categorisation/{category}/{language}"

        if category == "delete":
            delete_file(self.bucket, audio_filename, source_path)
        else:
            move_file(self.bucket, audio_filename, source_path, save_path)
            write_file(self.bucket, transcript, save_path, f"{job_name}.txt")

    def _finalize_records(self, records: List[Dict[str, Any]]) -> str:
        """Finalizes audio records by marking "AWS" column as `True`.

        Args:
            records (List[Dict[str, Any]]): AirTable records.

        Returns:
            str: Finalized record payload.
        """
        payload = json.dumps(
            {
                "records": [
                    {"id": record["id"], "fields": {"AWS": True}} for record in records
                ]
            }
        )
        return payload

__init__(airtable_url, filter_formula, headers)

Constructor for the AudioDashboardTable class.

Parameters:

Name Type Description Default
airtable_url str

URL endpoint to AirTable table.

required
filter_formula str

Additional GET URL filter formula parameter.

required
headers Dict[str, str]

API Header containing authorization.

required
Source code in src/airtable_apply_annotations/audio_dashboard_table.py
def __init__(self, airtable_url: str, filter_formula: str, headers: Dict[str, str]):
    """Constructor for the `AudioDashboardTable` class.

    Args:
        airtable_url (str): URL endpoint to AirTable table.
        filter_formula (str): Additional GET URL filter formula parameter.
        headers (Dict[str, str]): API Header containing authorization.
    """
    super().__init__(airtable_url, filter_formula, headers)