Skip to content

AirTable S3 Integration

airtable_apply_annotations.airtable_s3_integration

AirTableS3Integration

Source code in src/airtable_apply_annotations/airtable_s3_integration.py
class AirTableS3Integration:
    def __init__(self, airtable_url: str, filter_formula: str, headers: Dict[str, str]):
        """Constructor for the `AirTableS3Integration` class.

        Args:
            airtable_url (str): URL endpoint to AirTable table.
            filter_formula (str): Additional GET URL filter formula parameter.
            headers (Dict[str, str]): API Header containing authorization.
        """
        self.airtable_url = airtable_url
        self.filter_formula = filter_formula
        self.headers = headers
        self.bucket = BUCKET
        self.extensions = EXTENSIONS

    def process_table_data(self):
        """
        Gets AirTable data and applies annotation changes to S3 and finalizes the
        record.
        """
        records, offset = [], 0
        while True:
            try:
                response = requests.get(
                    f"{self.airtable_url}&{self.filter_formula}",
                    params={"offset": offset},
                    headers=self.headers,
                )
            except Exception as exc:
                print(exc)
            else:
                if response.ok:
                    response = response.json()
                    records += response["records"]

                    if "offset" in response:
                        offset = response["offset"]
                    else:
                        break
                else:
                    print("Failed to get data from AirTable")

        # batch size: 10
        for i in range(0, len(records), 10):
            batch = records[i : i + 10]
            for record in batch:
                self._apply_annotation_changes_s3(record)
            self._patch_record(self._finalize_records(batch))

    def _patch_record(self, payload: str):
        """Patches `payload` to `self.airtable_url` with authorized `self.headers`.

        Args:
            payload (str): Record payload.
        """
        try:
            response = requests.patch(
                self.airtable_url, headers=self.headers, data=payload
            )
        except Exception as exc:
            print(exc)
        else:
            if response.ok:
                return
            else:
                print(f"Failed to patch {payload}")

    def _apply_annotation_changes_s3(self, record: Dict[str, Any]):
        """Applies changes in an S3 directory based on an AirTable `record`'s annotation
        verdict.

        Args:
            record (Dict[str, Any]): An AirTable record/row.
        """
        pass

    def _finalize_records(self, records: List[Dict[str, Any]]) -> str:
        """Finalizes records by marking "AWS" column as `True`.

        Args:
            records (List[Dict[str, Any]]): AirTable records.

        Returns:
            str: Finalized record payload.
        """
        pass

__init__(airtable_url, filter_formula, headers)

Constructor for the AirTableS3Integration class.

Parameters:

Name Type Description Default
airtable_url str

URL endpoint to AirTable table.

required
filter_formula str

Additional GET URL filter formula parameter.

required
headers Dict[str, str]

API Header containing authorization.

required
Source code in src/airtable_apply_annotations/airtable_s3_integration.py
def __init__(self, airtable_url: str, filter_formula: str, headers: Dict[str, str]):
    """Constructor for the `AirTableS3Integration` class.

    Args:
        airtable_url (str): URL endpoint to AirTable table.
        filter_formula (str): Additional GET URL filter formula parameter.
        headers (Dict[str, str]): API Header containing authorization.
    """
    self.airtable_url = airtable_url
    self.filter_formula = filter_formula
    self.headers = headers
    self.bucket = BUCKET
    self.extensions = EXTENSIONS

process_table_data()

Gets AirTable data and applies annotation changes to S3 and finalizes the record.

Source code in src/airtable_apply_annotations/airtable_s3_integration.py
def process_table_data(self):
    """
    Gets AirTable data and applies annotation changes to S3 and finalizes the
    record.
    """
    records, offset = [], 0
    while True:
        try:
            response = requests.get(
                f"{self.airtable_url}&{self.filter_formula}",
                params={"offset": offset},
                headers=self.headers,
            )
        except Exception as exc:
            print(exc)
        else:
            if response.ok:
                response = response.json()
                records += response["records"]

                if "offset" in response:
                    offset = response["offset"]
                else:
                    break
            else:
                print("Failed to get data from AirTable")

    # batch size: 10
    for i in range(0, len(records), 10):
        batch = records[i : i + 10]
        for record in batch:
            self._apply_annotation_changes_s3(record)
        self._patch_record(self._finalize_records(batch))