-
Notifications
You must be signed in to change notification settings - Fork 814
/
Copy pathstructured_output.py
136 lines (113 loc) · 4.72 KB
/
structured_output.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
from collections.abc import AsyncIterable
from typing import Annotated, Callable, Optional, cast
from dotenv import load_dotenv
from pydantic import Field
from pydantic_core import from_json
from typing_extensions import TypedDict
from livekit.agents import (
NOT_GIVEN,
Agent,
AgentSession,
ChatContext,
FunctionTool,
JobContext,
ModelSettings,
WorkerOptions,
cli,
)
from livekit.plugins import openai, silero
from livekit.plugins.turn_detector.english import EnglishModel
logger = logging.getLogger("structured-output")
load_dotenv()
## This example demonstrates how to use structured output from the LLM to control the TTS.
## The LLM is instructed to provide a TTS directive, which is returned as a ResponseEmotion object.
## before generating the response
class ResponseEmotion(TypedDict):
voice_instructions: Annotated[
str,
Field(..., description="Concise TTS directive for tone, emotion, intonation, and speed"),
]
response: str
async def process_structured_output(
text: AsyncIterable[str],
callback: Optional[Callable[[ResponseEmotion], None]] = None,
) -> AsyncIterable[str]:
last_response = ""
acc_text = ""
async for chunk in text:
acc_text += chunk
try:
resp: ResponseEmotion = from_json(acc_text, allow_partial="trailing-strings")
except ValueError:
continue
if callback:
callback(resp)
if not resp.get("response"):
continue
new_delta = resp["response"][len(last_response) :]
if new_delta:
yield new_delta
last_response = resp["response"]
class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=(
"Your name is Echo. You are an extraordinarily expressive voice assistant "
"with mastery over vocal dynamics and emotions. Adapt your voice—modulate tone, "
"pitch, speed, intonation, and convey emotions such as happiness, sadness, "
"excitement, or calmness—to match the conversation context. "
"Keep responses concise, clear, and engaging, turning every interaction into a "
"captivating auditory performance."
),
stt=openai.STT(model="gpt-4o-transcribe"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(model="gpt-4o-mini-tts"),
)
async def llm_node(
self, chat_ctx: ChatContext, tools: list[FunctionTool], model_settings: ModelSettings
):
# not all LLMs support structured output, so we need to cast to the specific LLM type
llm = cast(openai.LLM, self.llm)
tool_choice = model_settings.tool_choice if model_settings else NOT_GIVEN
async with llm.chat(
chat_ctx=chat_ctx,
tools=tools,
tool_choice=tool_choice,
response_format=ResponseEmotion,
) as stream:
async for chunk in stream:
yield chunk
async def tts_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
instruction_updated = False
def output_processed(resp: ResponseEmotion):
nonlocal instruction_updated
if resp.get("voice_instructions") and resp.get("response") and not instruction_updated:
# when the response isn't empty, we can assume voice_instructions is complete.
# (if the LLM sent the fields in the right order)
instruction_updated = True
logger.info(
f"Applying TTS instructions before generating response audio: "
f'"{resp["voice_instructions"]}"'
)
tts = cast(openai.TTS, self.tts)
tts.update_options(instructions=resp["voice_instructions"])
# process_structured_output strips the TTS instructions and only synthesizes the verbal part
# of the LLM output
return Agent.default.tts_node(
self, process_structured_output(text, callback=output_processed), model_settings
)
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
# transcription_node needs to return what the agent would say, minus the TTS instructions
return Agent.default.transcription_node(
self, process_structured_output(text), model_settings
)
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
turn_detection=EnglishModel(),
)
await session.start(agent=MyAgent(), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))