mirror of https://github.com/xtekky/gpt4free
Add ChatgptDuo and Aibn Provider
Add support for "nest_asyncio", Reuse event_loops with event_loop_policy Support for "create_async" with synchron providerpull/943/head
parent
72c3ff7a25
commit
3c2755bc72
@ -0,0 +1,51 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import hashlib
|
||||
|
||||
from ..typing import AsyncGenerator
|
||||
from g4f.requests import AsyncSession
|
||||
from .base_provider import AsyncGeneratorProvider
|
||||
|
||||
|
||||
class Aibn(AsyncGeneratorProvider):
|
||||
url = "https://aibn.cc"
|
||||
supports_gpt_35_turbo = True
|
||||
working = True
|
||||
|
||||
@classmethod
|
||||
async def create_async_generator(
|
||||
cls,
|
||||
model: str,
|
||||
messages: list[dict[str, str]],
|
||||
**kwargs
|
||||
) -> AsyncGenerator:
|
||||
async with AsyncSession(impersonate="chrome107") as session:
|
||||
timestamp = int(time.time())
|
||||
data = {
|
||||
"messages": messages,
|
||||
"pass": None,
|
||||
"sign": generate_signature(timestamp, messages[-1]["content"]),
|
||||
"time": timestamp
|
||||
}
|
||||
async with session.post(f"{cls.url}/api/generate", json=data) as response:
|
||||
response.raise_for_status()
|
||||
async for chunk in response.content.iter_any():
|
||||
yield chunk.decode()
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
def params(cls):
|
||||
params = [
|
||||
("model", "str"),
|
||||
("messages", "list[dict[str, str]]"),
|
||||
("stream", "bool"),
|
||||
("temperature", "float"),
|
||||
]
|
||||
param = ", ".join([": ".join(p) for p in params])
|
||||
return f"g4f.provider.{cls.__name__} supports: ({param})"
|
||||
|
||||
|
||||
def generate_signature(timestamp: int, message: str, secret: str = "undefined"):
|
||||
data = f"{timestamp}:{message}:{secret}"
|
||||
return hashlib.sha256(data.encode()).hexdigest()
|
@ -0,0 +1,51 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from g4f.requests import AsyncSession
|
||||
from .base_provider import AsyncProvider, format_prompt
|
||||
|
||||
|
||||
class ChatgptDuo(AsyncProvider):
|
||||
url = "https://chatgptduo.com"
|
||||
supports_gpt_35_turbo = True
|
||||
working = True
|
||||
|
||||
@classmethod
|
||||
async def create_async(
|
||||
cls,
|
||||
model: str,
|
||||
messages: list[dict[str, str]],
|
||||
**kwargs
|
||||
) -> str:
|
||||
async with AsyncSession(impersonate="chrome107") as session:
|
||||
prompt = format_prompt(messages),
|
||||
data = {
|
||||
"prompt": prompt,
|
||||
"search": prompt,
|
||||
"purpose": "ask",
|
||||
}
|
||||
async with session.post(f"{cls.url}/", data=data) as response:
|
||||
response.raise_for_status()
|
||||
data = await response.json()
|
||||
|
||||
cls._sources = [{
|
||||
"title": source["title"],
|
||||
"url": source["link"],
|
||||
"snippet": source["snippet"]
|
||||
} for source in data["results"]]
|
||||
|
||||
return data["answer"]
|
||||
|
||||
@classmethod
|
||||
def get_sources(cls):
|
||||
return cls._sources
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
def params(cls):
|
||||
params = [
|
||||
("model", "str"),
|
||||
("messages", "list[dict[str, str]]"),
|
||||
("stream", "bool"),
|
||||
]
|
||||
param = ", ".join([": ".join(p) for p in params])
|
||||
return f"g4f.provider.{cls.__name__} supports: ({param})"
|
@ -0,0 +1,54 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from asyncio import AbstractEventLoop
|
||||
|
||||
import browser_cookie3
|
||||
|
||||
_cookies: dict[str, dict[str, str]] = {}
|
||||
|
||||
# Use own event_loop_policy with a selector event loop on windows.
|
||||
if sys.platform == 'win32':
|
||||
_event_loop_policy = asyncio.WindowsSelectorEventLoopPolicy()
|
||||
else:
|
||||
_event_loop_policy = asyncio.get_event_loop_policy()
|
||||
|
||||
# If event loop is already running, handle nested event loops
|
||||
# If "nest_asyncio" is installed, patch the event loop.
|
||||
def get_event_loop() -> AbstractEventLoop:
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
return _event_loop_policy.get_event_loop()
|
||||
try:
|
||||
event_loop = _event_loop_policy.get_event_loop()
|
||||
if not hasattr(event_loop.__class__, "_nest_patched"):
|
||||
import nest_asyncio
|
||||
nest_asyncio.apply(event_loop)
|
||||
return event_loop
|
||||
except ImportError:
|
||||
raise RuntimeError(
|
||||
'Use "create_async" instead of "create" function in a running event loop. Or install the "nest_asyncio" package.')
|
||||
|
||||
# Load cookies for a domain from all supported browser.
|
||||
# Cache the results in the "_cookies" variable
|
||||
def get_cookies(cookie_domain: str) -> dict:
|
||||
if cookie_domain not in _cookies:
|
||||
_cookies[cookie_domain] = {}
|
||||
try:
|
||||
for cookie in browser_cookie3.load(cookie_domain):
|
||||
_cookies[cookie_domain][cookie.name] = cookie.value
|
||||
except:
|
||||
pass
|
||||
return _cookies[cookie_domain]
|
||||
|
||||
|
||||
def format_prompt(messages: list[dict[str, str]], add_special_tokens=False):
|
||||
if add_special_tokens or len(messages) > 1:
|
||||
formatted = "\n".join(
|
||||
["%s: %s" % ((message["role"]).capitalize(), message["content"]) for message in messages]
|
||||
)
|
||||
return f"{formatted}\nAssistant:"
|
||||
else:
|
||||
return messages[0]["content"]
|
Loading…
Reference in New Issue