In this example we ask workers to record given texts via voice recorder.
This is annotation task, since there are an unlimited number of options for recording on a single text.
This example has two features in addition to usual annotation task:
You may want to first study audio transcript example because it contains more detailed description of annotation tasks pipeline.
from datetime import timedelta
import os
import pandas as pd
from typing import List, Tuple
import toloka.client as toloka
from crowdom import base, datasource, client, objects, pricing, params as labeling_params
import yaml
import logging.config
with open('logging.yaml') as f:
logging.config.dictConfig(yaml.full_load(f.read()))
from IPython.display import clear_output, display
token = os.getenv('TOLOKA_TOKEN') or input('Enter your token: ')
clear_output()
toloka_client = client.create_toloka_client(token=token)
s3 = datasource.S3(
endpoint='storage.yandexcloud.net',
bucket=input('Enter your S3 bucket: '),
path=input('Enter path in bucket to store audio recordings: '),
access_key_id=os.getenv('AWS_ACCESS_KEY_ID') or input('Enter your AWS access key ID: '),
secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY') or input('Enter your AWS secret access key: '),
)
clear_output()
function = base.AnnotationFunction(inputs=(objects.Text,), outputs=(objects.Audio,))
import markdown2
instruction = {}
for worker_lang in ['RU']:
with open(f'instruction_{worker_lang}.md') as f:
instruction[worker_lang] = markdown2.markdown(f.read())
task_spec = base.TaskSpec(
id='voice-recording',
function=function,
name=base.LocalizedString({
'EN': 'Voice recording',
'RU': 'Запись речи на диктофон',
}),
description=base.LocalizedString({
'EN': 'Speak the recordings into a voice recorder',
'RU': 'Нужно наговорить записи на диктофон.',
}),
instruction=instruction,
)
lang = 'EN'
task_spec_en = client.AnnotationTaskSpec(task_spec, lang)
task_duration_hint = timedelta(seconds=10)
client.define_task(task_spec_en, toloka_client)
input_objects = datasource.read_tasks('tasks.json', task_spec_en.task_mapping)
# Checks are provided by ASR model, which is not controlled in usual way. We need at least one control task by technical reasons.
control_objects = datasource.read_tasks('control_tasks.json', task_spec_en.check.task_mapping, has_solutions=True)
We will use Yandex Speechkit ASR model in this example. You can use any model you want.
# TODO: publish in pypi.org
%pip install -i http://pypi.yandex-team.ru/simple/ yandex-speechkit
%pip install pylev
from speechkit.common.utils import configure_credentials
from speechkit.common import Product
from speechkit import model_repository
from speechkit.stt import RecognitionConfig, AudioProcessingType
from IPython.display import clear_output
configure_credentials(yc_ai_token=f'Api-Key {input("Enter your ASR model API key: ")}')
clear_output()
model = model_repository.recognition_model(product=Product.Yandex)
lang_map = {'EN': 'en-US', 'RU': 'ru-RU'}
config = RecognitionConfig(mode=AudioProcessingType.Full, language=lang_map[lang])
from multiprocessing.pool import ThreadPool
from pydub import AudioSegment
import io
def recognize_record(s3_url) -> str:
file_name = s3_url.split('/')[-1]
audio_bytes = s3.client.get_object(Bucket=s3.bucket, Key=f'{s3.path}/{file_name}')['Body'].read()
audio = AudioSegment.from_wav(io.BytesIO(audio_bytes))
result = model.transcribe(audio, config)
return ' '.join(chunk.raw_text for chunk in result)
import pylev
def levenshtein(hypothesis: str, reference: str) -> float:
return float(pylev.levenshtein(hypothesis, reference)) / max(len(hypothesis), len(reference), 1)
logger = logging.getLogger('crowdom')
Model is specified by it's name (see below) and Python function, which provides implementation of task function for a batch of source items.
Since model is used for worker recordings evaluation, we transform task function
f(Text) = Audio
into it's evaluation form
f(Text, Audio) = BinaryEvaluation
If we consider worker's recordings as accurate, we return BinaryEvaluation(True)
for it.
def recognize_voice_recordings(tasks: List[Tuple[objects.Text, objects.Audio]]) -> List[Tuple[base.BinaryEvaluation]]:
if not tasks:
return
pool = ThreadPool(processes=min(len(tasks), 40))
recognized_texts = pool.map(recognize_record, [audio.url for _, audio in tasks])
results = []
for (source_text, audio), recognized_text in zip(tasks, recognized_texts):
distance = levenshtein(recognized_text, source_text.text)
verdict = (base.BinaryEvaluation(distance <= 0.5),)
results.append(verdict)
logger.debug('\n' + f"""
audio: {audio.url}
source text: {source_text.text}
recognized text: {recognized_text}
distance: {distance}""".strip() + '\n')
return results
from crowdom import worker
model_worker = worker.Model(name='asr:general', func=recognize_voice_recordings)
params_form = labeling_params.get_annotation_interface(
task_spec=task_spec_en,
check_task_duration_hint=task_duration_hint,
annotation_task_duration_hint=task_duration_hint,
toloka_client=toloka_client,
)
check_params, annotation_params = params_form.get_params()
check_params.control_tasks_count = 1 # we need to create pool, even without opening it, with one stub control task
check_params.model = model_worker # specify your model worker
artifacts = client.launch_annotation(
task_spec_en,
annotation_params,
check_params,
input_objects,
control_objects,
toloka_client,
s3=s3,
)
results = artifacts.results
results.predict()
text | audio | |
---|---|---|
0 | hello | https://storage.yandexcloud.net/test/voice-recording/B2FA010A-80AD-4B9B-936F-E74AF44FDB42.wav |
1 | no thanks | https://storage.yandexcloud.net/test/voice-recording/509A1B21-E035-47CE-BE9C-C102470F159A.wav |
2 | your order is accepted | https://storage.yandexcloud.net/test/voice-recording/99BB1FB8-B9CC-4BDB-9DC2-9DFC6E404C98.wav |