Skip to content

Commit dd9e6a1

Browse files
committed
feat(google/gemini): add THINKING/STREAMING env toggles, emit chat lifecycle events, bump version
- Add Pipe.Valves flags for THINKING_ENABLED and STREAMING_ENABLED (driven by env vars GOOGLE_THINKING_ENABLED / GOOGLE_STREAMING_ENABLED) and honor them to disable thinking or streaming when set. - Introduce internal emit_chat_event helper and emit chat lifecycle events (chat:start, chat:message, replace, chat:finish) so downstream consumers receive structured start/replace/finish notifications. - Improve streaming handling: include role in message deltas, emit finish events on safety blocks and errors (with error flag), and ensure final full response is emitted after streaming ends. - Provide clearer logging when features are disabled and standardize streamed error messages. - Bump pipeline version to 1.6.5. - Update docs to surface the new environment variables.
1 parent 57631e8 commit dd9e6a1

File tree

2 files changed

+103
-22
lines changed

2 files changed

+103
-22
lines changed

docs/google-gemini-integration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ GOOGLE_IMAGE_HISTORY_FIRST=true
118118
# Enable fallback to data URL when image upload fails
119119
# Default: true
120120
GOOGLE_IMAGE_UPLOAD_FALLBACK=true
121+
122+
# Enable Gemini thinking outputs globally
123+
# Default: true
124+
GOOGLE_THINKING_ENABLED=true
125+
126+
# Enable streaming responses globally
127+
# Default: true
128+
GOOGLE_STREAMING_ENABLED=true
121129
```
122130

123131
### Connection Method: Google Generative AI API (Default)

pipelines/google/google_gemini.py

Lines changed: 95 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
author_url: https://github.com/owndev/
55
project_url: https://github.com/owndev/Open-WebUI-Functions
66
funding_url: https://github.com/sponsors/owndev
7-
version: 1.6.4
7+
version: 1.6.5
88
license: Apache License 2.0
99
description: Highly optimized Google Gemini pipeline with advanced image generation capabilities, intelligent compression, and streamlined processing workflows.
1010
features:
@@ -158,6 +158,14 @@ class Valves(BaseModel):
158158
default=os.getenv("GOOGLE_API_VERSION", "v1alpha"),
159159
description="API version to use for Google Generative AI (e.g., v1alpha, v1beta, v1).",
160160
)
161+
THINKING_ENABLED: bool = Field(
162+
default=os.getenv("GOOGLE_THINKING_ENABLED", "true").lower() == "true",
163+
description="Enable Gemini thinking outputs (set false to disable).",
164+
)
165+
STREAMING_ENABLED: bool = Field(
166+
default=os.getenv("GOOGLE_STREAMING_ENABLED", "true").lower() == "true",
167+
description="Enable streaming responses (set false to force non-streaming mode).",
168+
)
161169
USE_VERTEX_AI: bool = Field(
162170
default=os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "false").lower() == "true",
163171
description="Whether to use Google Cloud Vertex AI instead of the Google Generative AI API.",
@@ -1415,6 +1423,10 @@ def _configure_generation(
14151423

14161424
# Enable Gemini "Thinking" when requested (default: on) and supported by the model
14171425
include_thoughts = body.get("include_thoughts", True)
1426+
if not self.valves.THINKING_ENABLED:
1427+
include_thoughts = False
1428+
self.log.debug("Thinking disabled via GOOGLE_THINKING_ENABLED")
1429+
14181430
if include_thoughts and self._check_thinking_support(model_id):
14191431
try:
14201432
gen_config_params["thinking_config"] = types.ThinkingConfig(
@@ -1591,6 +1603,17 @@ async def _handle_streaming_response(
15911603
Returns:
15921604
Generator yielding text chunks
15931605
"""
1606+
1607+
async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None:
1608+
if not __event_emitter__:
1609+
return
1610+
try:
1611+
await __event_emitter__({"type": event_type, "data": data})
1612+
except Exception as emit_error: # pragma: no cover - defensive
1613+
self.log.warning(f"Failed to emit {event_type} event: {emit_error}")
1614+
1615+
await emit_chat_event("chat:start", {"role": "assistant"})
1616+
15941617
grounding_metadata_list = []
15951618
# Accumulate content separately for answer and thoughts
15961619
answer_chunks: list[str] = []
@@ -1606,9 +1629,32 @@ async def _handle_streaming_response(
16061629
response_iterator.prompt_feedback
16071630
and response_iterator.prompt_feedback.block_reason
16081631
):
1609-
yield f"[Blocked due to Prompt Safety: {response_iterator.prompt_feedback.block_reason.name}]"
1632+
block_reason = (
1633+
response_iterator.prompt_feedback.block_reason.name
1634+
)
1635+
message = f"[Blocked due to Prompt Safety: {block_reason}]"
1636+
await emit_chat_event(
1637+
"chat:finish",
1638+
{
1639+
"role": "assistant",
1640+
"content": message,
1641+
"done": True,
1642+
"error": True,
1643+
},
1644+
)
1645+
yield message
16101646
else:
1611-
yield "[Blocked by safety settings]"
1647+
message = "[Blocked by safety settings]"
1648+
await emit_chat_event(
1649+
"chat:finish",
1650+
{
1651+
"role": "assistant",
1652+
"content": message,
1653+
"done": True,
1654+
"error": True,
1655+
},
1656+
)
1657+
yield message
16121658
return # Stop generation
16131659

16141660
if chunk.candidates[0].grounding_metadata:
@@ -1627,7 +1673,10 @@ async def _handle_streaming_response(
16271673
await __event_emitter__(
16281674
{
16291675
"type": "chat:message:delta",
1630-
"data": {"content": chunk.text},
1676+
"data": {
1677+
"role": "assistant",
1678+
"content": chunk.text,
1679+
},
16311680
}
16321681
)
16331682
continue
@@ -1664,7 +1713,10 @@ async def _handle_streaming_response(
16641713
await __event_emitter__(
16651714
{
16661715
"type": "chat:message:delta",
1667-
"data": {"content": part.text},
1716+
"data": {
1717+
"role": "assistant",
1718+
"content": part.text,
1719+
},
16681720
}
16691721
)
16701722
except Exception as part_error:
@@ -1684,7 +1736,9 @@ async def _handle_streaming_response(
16841736
)
16851737
final_answer_text = cited or final_answer_text
16861738

1687-
# If we captured thoughts, wrap them in a collapsible <details> section
1739+
final_content = final_answer_text
1740+
details_block: Optional[str] = None
1741+
16881742
if thought_chunks:
16891743
duration_s = int(
16901744
max(0, time.time() - (thinking_started_at or time.time()))
@@ -1702,39 +1756,54 @@ async def _handle_streaming_response(
17021756
{quoted_content}
17031757
17041758
</details>""".strip()
1759+
final_content = f"{details_block}{final_answer_text}"
17051760

1706-
# Combine thoughts and answer (images not processed in streaming mode)
1707-
full_content = details_block + final_answer_text
1761+
if final_content is None:
1762+
final_content = ""
17081763

1709-
await __event_emitter__(
1710-
{
1711-
"type": "replace",
1712-
"data": {"content": full_content},
1713-
}
1714-
)
1764+
# Ensure downstream consumers (UI, TTS) receive the complete response once streaming ends.
1765+
await emit_chat_event(
1766+
"replace", {"role": "assistant", "content": final_content}
1767+
)
1768+
await emit_chat_event(
1769+
"chat:message",
1770+
{"role": "assistant", "content": final_content, "done": True},
1771+
)
1772+
1773+
if thought_chunks:
17151774
# Clear the thinking status without a summary in the status emitter
17161775
await __event_emitter__(
17171776
{
17181777
"type": "status",
17191778
"data": {"action": "thinking", "done": True, "hidden": True},
17201779
}
17211780
)
1722-
elif grounding_metadata_list:
1723-
# If no thoughts but we have grounding, ensure final answer (with citations) is reflected
1724-
await __event_emitter__(
1725-
{"type": "replace", "data": {"content": final_answer_text}}
1726-
)
1781+
1782+
await emit_chat_event(
1783+
"chat:finish",
1784+
{"role": "assistant", "content": final_content, "done": True},
1785+
)
17271786

17281787
except Exception as e:
17291788
self.log.exception(f"Error during streaming: {e}")
17301789
# Check if it's a chunk size error and provide specific guidance
17311790
error_msg = str(e).lower()
17321791
if "chunk too big" in error_msg or "chunk size" in error_msg:
1733-
yield f"Error: Image too large for processing. Please try with a smaller image (max 15 MB recommended) or reduce image quality."
1792+
message = "Error: Image too large for processing. Please try with a smaller image (max 15 MB recommended) or reduce image quality."
17341793
elif "quota" in error_msg or "rate limit" in error_msg:
1735-
yield f"Error: API quota exceeded. Please try again later."
1794+
message = "Error: API quota exceeded. Please try again later."
17361795
else:
1737-
yield f"Error during streaming: {e}"
1796+
message = f"Error during streaming: {e}"
1797+
await emit_chat_event(
1798+
"chat:finish",
1799+
{
1800+
"role": "assistant",
1801+
"content": message,
1802+
"done": True,
1803+
"error": True,
1804+
},
1805+
)
1806+
yield message
17381807

17391808
def _get_safety_block_message(self, response: Any) -> Optional[str]:
17401809
"""Check for safety blocks and return appropriate message."""
@@ -1845,6 +1914,10 @@ async def pipe(
18451914

18461915
# Get stream flag
18471916
stream = body.get("stream", False)
1917+
if not self.valves.STREAMING_ENABLED:
1918+
if stream:
1919+
self.log.debug("Streaming disabled via GOOGLE_STREAMING_ENABLED")
1920+
stream = False
18481921
messages = body.get("messages", [])
18491922

18501923
# For image generation models, gather ALL images from the last user turn

0 commit comments

Comments
 (0)