Audio Segmenter
speechline.segmenters.segmenter.Segmenter
Source code in speechline/segmenters/segmenter.py
class Segmenter:
def chunk_audio_segments(
self,
audio_path: str,
outdir: str,
offsets: List[Dict[str, Union[str, float]]],
do_noise_classify: bool = False,
minimum_chunk_duration: float = 1.0,
**kwargs,
) -> List[List[Dict[str, Union[str, float]]]]:
"""
Chunks an audio file based on its offsets.
Generates and exports WAV audio chunks and aligned TSV phoneme transcripts.
Args:
audio_path (str):
Path to audio file to chunk.
outdir (str):
Output directory to save chunked audio.
Per-region subfolders will be generated under this directory.
offsets (List[Dict[str, Union[str, float]]]):
List of phoneme offsets.
do_noise_classify (bool, optional):
Whether to perform noise classification on empty chunks.
Defaults to `False`.
minimum_chunk_duration (float, optional):
Minimum chunk duration (in seconds) to be exported.
Defaults to 0.3 second.
Returns:
List[List[Dict[str, Union[str, float]]]]:
List of offsets for every segment.
"""
segments = self.chunk_offsets(offsets, **kwargs)
# skip empty segments (undetected transcripts)
if len(segments) == 0:
return [[{}]]
if do_noise_classify:
segments = self.insert_empty_tags(segments, **kwargs)
segments = self.classify_noise(segments, audio_path, **kwargs)
if isinstance(audio_path, str):
audio = AudioSegment.from_file(audio_path)
elif isinstance(audio_path, dict):
audio = np_f32_to_pydub(audio_path)
audio_path = audio_path["path"]
audio_segments: List[AudioSegment] = [
audio[s[0]["start_time"] * 1000 : s[-1]["end_time"] * 1000] for s in segments
]
# shift segments based on their respective index start times
shifted_segments = [self._shift_offsets(segment) for segment in segments]
# create output directory folder and subfolders
os.makedirs(get_outdir_path(audio_path, outdir), exist_ok=True)
for idx, (segment, audio_segment) in enumerate(zip(shifted_segments, audio_segments)):
# skip export if audio segment does not meet minimum chunk duration
if len(audio_segment) < minimum_chunk_duration * 1000:
continue
# export TSV transcripts and WAV audio segment
output_tsv_path = get_chunk_path(audio_path, outdir, idx, "tsv")
export_segment_transcripts_tsv(output_tsv_path, segment)
output_audio_path = get_chunk_path(audio_path, outdir, idx, "wav")
export_segment_audio_wav(output_audio_path, audio_segment)
return shifted_segments
def classify_noise(
self,
segments: List[List[Dict[str, Union[str, float]]]],
audio_path: str,
noise_classifier: AudioModule,
noise_classifier_threshold: float,
empty_tag: str = "<EMPTY>",
**kwargs,
) -> List[List[Dict[str, Union[str, float]]]]:
"""
Classify empty tags as noise.
Args:
segments (List[List[Dict[str, Union[str, float]]]]):
List of chunked segments with empty tag.
audio_path (str):
Path to audio file to chunk.
noise_classifier (AudioModule):
Audio Module to perform noise classification.
noise_classifier_threshold (float):
Minimum probability threshold for multi label classification.
empty_tag (str, optional):
Special empty tag.
Defaults to `"<EMPTY>"`.
Returns:
List[List[Dict[str, Union[str, float]]]]:
Chunk segments with classified noise tags.
"""
pos, empty_tag_pos = 0, {}
for i, segment in enumerate(segments):
for j, offset in enumerate(segment):
if offset["text"] == empty_tag:
empty_tag_pos[pos] = (i, j)
pos += 1
# return original segments if no empty tags
if len(empty_tag_pos) == 0:
return segments
audio = AudioSegment.from_file(audio_path)
audio_arrays = [
{
"path": None,
"array": pydub_to_np(audio[offset["start_time"] * 1000 : offset["end_time"] * 1000]),
"sampling_rate": audio.frame_rate,
}
for segment in segments
for offset in segment
if offset["text"] == empty_tag
]
dataset = Dataset.from_dict({"audio": audio_arrays})
dataset = dataset.cast_column("audio", Audio(sampling_rate=noise_classifier.sampling_rate))
outputs = noise_classifier.predict(dataset, threshold=noise_classifier_threshold)
for idx, predictions in enumerate(outputs):
if len(predictions) > 0:
i, j = empty_tag_pos[idx]
offset = segments[i][j]
label = max(predictions, key=lambda item: item["score"])["label"]
offset["text"] = f"<{label}>"
return segments
def insert_empty_tags(
self,
segments: List[List[Dict[str, Union[str, float]]]],
minimum_empty_duration: float,
empty_tag: str = "<EMPTY>",
**kwargs,
) -> List[List[Dict[str, Union[str, float]]]]:
"""
Inserts special `<EMPTY>` tag to mark for noise classification.
Inserts tags at indices in segments where empty duration
is at least `minimum_empty_duration`.
Args:
segments (List[List[Dict[str, Union[str, float]]]]):
List of chunked segments to insert into.
minimum_empty_duration (float):
Minimum silence duration in seconds.
empty_tag (str, optional):
Special empty tag.
Defaults to "<EMPTY>".
Returns:
List[List[Dict[str, Union[str, float]]]]:
Updated segments where empty tags have been inserted.
"""
for segment in segments:
gaps = [round(next["start_time"] - curr["end_time"], 3) for curr, next in zip(segment, segment[1:])]
for idx, gap in reversed(list(enumerate(gaps))):
if gap >= minimum_empty_duration:
start_time = segment[idx]["end_time"]
end_time = segment[idx + 1]["start_time"]
empty_offset = {
"text": empty_tag,
"start_time": start_time,
"end_time": end_time,
}
segment.insert(idx + 1, empty_offset)
return segments
def _shift_offsets(self, offset: List[Dict[str, Union[str, float]]]) -> List[Dict[str, Union[str, float]]]:
"""
Shift start and end time of offsets by index start time.
Subtracts all start and end times by index start time.
Args:
offset (List[Dict[str, Union[str, float]]]):
Offsets to shift.
Returns:
List[Dict[str, Union[str, float]]]:
Shifted offsets.
"""
index_start = offset[0]["start_time"]
shifted_offset = [
{
"text": o["text"],
"start_time": round(o["start_time"] - index_start, 3),
"end_time": round(o["end_time"] - index_start, 3),
}
for o in offset
]
return shifted_offset
chunk_audio_segments(self, audio_path, outdir, offsets, do_noise_classify=False, minimum_chunk_duration=1.0, **kwargs)
Chunks an audio file based on its offsets. Generates and exports WAV audio chunks and aligned TSV phoneme transcripts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
audio_path |
str |
Path to audio file to chunk. |
required |
outdir |
str |
Output directory to save chunked audio. Per-region subfolders will be generated under this directory. |
required |
offsets |
List[Dict[str, Union[str, float]]] |
List of phoneme offsets. |
required |
do_noise_classify |
bool |
Whether to perform noise classification on empty chunks.
Defaults to |
False |
minimum_chunk_duration |
float |
Minimum chunk duration (in seconds) to be exported. Defaults to 0.3 second. |
1.0 |
Returns:
Type | Description |
---|---|
List[List[Dict[str, Union[str, float]]]] |
List of offsets for every segment. |
Source code in speechline/segmenters/segmenter.py
def chunk_audio_segments(
self,
audio_path: str,
outdir: str,
offsets: List[Dict[str, Union[str, float]]],
do_noise_classify: bool = False,
minimum_chunk_duration: float = 1.0,
**kwargs,
) -> List[List[Dict[str, Union[str, float]]]]:
"""
Chunks an audio file based on its offsets.
Generates and exports WAV audio chunks and aligned TSV phoneme transcripts.
Args:
audio_path (str):
Path to audio file to chunk.
outdir (str):
Output directory to save chunked audio.
Per-region subfolders will be generated under this directory.
offsets (List[Dict[str, Union[str, float]]]):
List of phoneme offsets.
do_noise_classify (bool, optional):
Whether to perform noise classification on empty chunks.
Defaults to `False`.
minimum_chunk_duration (float, optional):
Minimum chunk duration (in seconds) to be exported.
Defaults to 0.3 second.
Returns:
List[List[Dict[str, Union[str, float]]]]:
List of offsets for every segment.
"""
segments = self.chunk_offsets(offsets, **kwargs)
# skip empty segments (undetected transcripts)
if len(segments) == 0:
return [[{}]]
if do_noise_classify:
segments = self.insert_empty_tags(segments, **kwargs)
segments = self.classify_noise(segments, audio_path, **kwargs)
if isinstance(audio_path, str):
audio = AudioSegment.from_file(audio_path)
elif isinstance(audio_path, dict):
audio = np_f32_to_pydub(audio_path)
audio_path = audio_path["path"]
audio_segments: List[AudioSegment] = [
audio[s[0]["start_time"] * 1000 : s[-1]["end_time"] * 1000] for s in segments
]
# shift segments based on their respective index start times
shifted_segments = [self._shift_offsets(segment) for segment in segments]
# create output directory folder and subfolders
os.makedirs(get_outdir_path(audio_path, outdir), exist_ok=True)
for idx, (segment, audio_segment) in enumerate(zip(shifted_segments, audio_segments)):
# skip export if audio segment does not meet minimum chunk duration
if len(audio_segment) < minimum_chunk_duration * 1000:
continue
# export TSV transcripts and WAV audio segment
output_tsv_path = get_chunk_path(audio_path, outdir, idx, "tsv")
export_segment_transcripts_tsv(output_tsv_path, segment)
output_audio_path = get_chunk_path(audio_path, outdir, idx, "wav")
export_segment_audio_wav(output_audio_path, audio_segment)
return shifted_segments
classify_noise(self, segments, audio_path, noise_classifier, noise_classifier_threshold, empty_tag='<EMPTY>', **kwargs)
Classify empty tags as noise.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
segments |
List[List[Dict[str, Union[str, float]]]] |
List of chunked segments with empty tag. |
required |
audio_path |
str |
Path to audio file to chunk. |
required |
noise_classifier |
AudioModule |
Audio Module to perform noise classification. |
required |
noise_classifier_threshold |
float |
Minimum probability threshold for multi label classification. |
required |
empty_tag |
str |
Special empty tag.
Defaults to |
'<EMPTY>' |
Returns:
Type | Description |
---|---|
List[List[Dict[str, Union[str, float]]]] |
Chunk segments with classified noise tags. |
Source code in speechline/segmenters/segmenter.py
def classify_noise(
self,
segments: List[List[Dict[str, Union[str, float]]]],
audio_path: str,
noise_classifier: AudioModule,
noise_classifier_threshold: float,
empty_tag: str = "<EMPTY>",
**kwargs,
) -> List[List[Dict[str, Union[str, float]]]]:
"""
Classify empty tags as noise.
Args:
segments (List[List[Dict[str, Union[str, float]]]]):
List of chunked segments with empty tag.
audio_path (str):
Path to audio file to chunk.
noise_classifier (AudioModule):
Audio Module to perform noise classification.
noise_classifier_threshold (float):
Minimum probability threshold for multi label classification.
empty_tag (str, optional):
Special empty tag.
Defaults to `"<EMPTY>"`.
Returns:
List[List[Dict[str, Union[str, float]]]]:
Chunk segments with classified noise tags.
"""
pos, empty_tag_pos = 0, {}
for i, segment in enumerate(segments):
for j, offset in enumerate(segment):
if offset["text"] == empty_tag:
empty_tag_pos[pos] = (i, j)
pos += 1
# return original segments if no empty tags
if len(empty_tag_pos) == 0:
return segments
audio = AudioSegment.from_file(audio_path)
audio_arrays = [
{
"path": None,
"array": pydub_to_np(audio[offset["start_time"] * 1000 : offset["end_time"] * 1000]),
"sampling_rate": audio.frame_rate,
}
for segment in segments
for offset in segment
if offset["text"] == empty_tag
]
dataset = Dataset.from_dict({"audio": audio_arrays})
dataset = dataset.cast_column("audio", Audio(sampling_rate=noise_classifier.sampling_rate))
outputs = noise_classifier.predict(dataset, threshold=noise_classifier_threshold)
for idx, predictions in enumerate(outputs):
if len(predictions) > 0:
i, j = empty_tag_pos[idx]
offset = segments[i][j]
label = max(predictions, key=lambda item: item["score"])["label"]
offset["text"] = f"<{label}>"
return segments
insert_empty_tags(self, segments, minimum_empty_duration, empty_tag='<EMPTY>', **kwargs)
Inserts special <EMPTY>
tag to mark for noise classification.
Inserts tags at indices in segments where empty duration
is at least minimum_empty_duration
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
segments |
List[List[Dict[str, Union[str, float]]]] |
List of chunked segments to insert into. |
required |
minimum_empty_duration |
float |
Minimum silence duration in seconds. |
required |
empty_tag |
str |
Special empty tag.
Defaults to " |
'<EMPTY>' |
Returns:
Type | Description |
---|---|
List[List[Dict[str, Union[str, float]]]] |
Updated segments where empty tags have been inserted. |
Source code in speechline/segmenters/segmenter.py
def insert_empty_tags(
self,
segments: List[List[Dict[str, Union[str, float]]]],
minimum_empty_duration: float,
empty_tag: str = "<EMPTY>",
**kwargs,
) -> List[List[Dict[str, Union[str, float]]]]:
"""
Inserts special `<EMPTY>` tag to mark for noise classification.
Inserts tags at indices in segments where empty duration
is at least `minimum_empty_duration`.
Args:
segments (List[List[Dict[str, Union[str, float]]]]):
List of chunked segments to insert into.
minimum_empty_duration (float):
Minimum silence duration in seconds.
empty_tag (str, optional):
Special empty tag.
Defaults to "<EMPTY>".
Returns:
List[List[Dict[str, Union[str, float]]]]:
Updated segments where empty tags have been inserted.
"""
for segment in segments:
gaps = [round(next["start_time"] - curr["end_time"], 3) for curr, next in zip(segment, segment[1:])]
for idx, gap in reversed(list(enumerate(gaps))):
if gap >= minimum_empty_duration:
start_time = segment[idx]["end_time"]
end_time = segment[idx + 1]["start_time"]
empty_offset = {
"text": empty_tag,
"start_time": start_time,
"end_time": end_time,
}
segment.insert(idx + 1, empty_offset)
return segments