mirror of https://github.com/xtekky/gpt4free
Add create images to Bing (#1426)
Add create images from Bing Add FreeChatgpt Provider Fix Bard Providerpull/1438/head^2
parent
2e6399230c
commit
bee75be8e3
@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from aiohttp import ClientSession
|
||||
|
||||
from ..typing import AsyncResult, Messages
|
||||
from .base_provider import AsyncGeneratorProvider
|
||||
|
||||
|
||||
class FreeChatgpt(AsyncGeneratorProvider):
|
||||
url = "https://free.chatgpt.org.uk"
|
||||
working = True
|
||||
supports_gpt_35_turbo = True
|
||||
supports_gpt_4 = True
|
||||
supports_message_history = True
|
||||
|
||||
@classmethod
|
||||
async def create_async_generator(
|
||||
cls,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
proxy: str = None,
|
||||
**kwargs
|
||||
) -> AsyncResult:
|
||||
if not model:
|
||||
model = "gpt-3.5-turbo"
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/119.0",
|
||||
"Accept": "application/json, text/event-stream",
|
||||
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Content-Type": "application/json",
|
||||
"Referer": "https://free.chatgpt.org.uk/",
|
||||
"x-requested-with": "XMLHttpRequest",
|
||||
"Origin": "https://free.chatgpt.org.uk",
|
||||
"Sec-Fetch-Dest": "empty",
|
||||
"Sec-Fetch-Mode": "cors",
|
||||
"Sec-Fetch-Site": "same-origin",
|
||||
"Connection": "keep-alive",
|
||||
"Alt-Used": "free.chatgpt.org.uk",
|
||||
"Pragma": "no-cache",
|
||||
"Cache-Control": "no-cache",
|
||||
"TE": "trailers",
|
||||
}
|
||||
async with ClientSession(headers=headers) as session:
|
||||
data = {
|
||||
"messages": messages,
|
||||
"isAzure": False,
|
||||
"azureApiVersion": "2023-08-01-preview",
|
||||
"stream": True,
|
||||
"model": model,
|
||||
"temperature": 0.5,
|
||||
"presence_penalty": 0,
|
||||
"frequency_penalty": 0,
|
||||
"top_p": 1,
|
||||
"baseUrl": "/api/openai",
|
||||
"maxIterations": 10,
|
||||
"returnIntermediateSteps": True,
|
||||
"useTools": ["web-search", "calculator", "web-browser"],
|
||||
**kwargs
|
||||
}
|
||||
async with session.post(f"{cls.url}/api/langchain/tool/agent/nodejs", json=data, proxy=proxy) as response:
|
||||
response.raise_for_status()
|
||||
async for line in response.content:
|
||||
if line.startswith(b"data: "):
|
||||
data = json.loads(line[6:])
|
||||
if data["isSuccess"] and not data["isToolMessage"]:
|
||||
yield data["message"]
|
@ -0,0 +1,43 @@
|
||||
from aiohttp import ClientSession
|
||||
|
||||
|
||||
class Conversation():
|
||||
def __init__(self, conversationId: str, clientId: str, conversationSignature: str) -> None:
|
||||
self.conversationId = conversationId
|
||||
self.clientId = clientId
|
||||
self.conversationSignature = conversationSignature
|
||||
|
||||
async def create_conversation(session: ClientSession, proxy: str = None) -> Conversation:
|
||||
url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4'
|
||||
async with session.get(url, proxy=proxy) as response:
|
||||
data = await response.json()
|
||||
|
||||
conversationId = data.get('conversationId')
|
||||
clientId = data.get('clientId')
|
||||
conversationSignature = response.headers.get('X-Sydney-Encryptedconversationsignature')
|
||||
|
||||
if not conversationId or not clientId or not conversationSignature:
|
||||
raise Exception('Failed to create conversation.')
|
||||
return Conversation(conversationId, clientId, conversationSignature)
|
||||
|
||||
async def list_conversations(session: ClientSession) -> list:
|
||||
url = "https://www.bing.com/turing/conversation/chats"
|
||||
async with session.get(url) as response:
|
||||
response = await response.json()
|
||||
return response["chats"]
|
||||
|
||||
async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> list:
|
||||
url = "https://sydney.bing.com/sydney/DeleteSingleConversation"
|
||||
json = {
|
||||
"conversationId": conversation.conversationId,
|
||||
"conversationSignature": conversation.conversationSignature,
|
||||
"participant": {"id": conversation.clientId},
|
||||
"source": "cib",
|
||||
"optionsSets": ["autosave"]
|
||||
}
|
||||
try:
|
||||
async with session.post(url, json=json, proxy=proxy) as response:
|
||||
response = await response.json()
|
||||
return response["result"]["value"] == "Success"
|
||||
except:
|
||||
return False
|
@ -0,0 +1,164 @@
|
||||
import asyncio
|
||||
import time, json, os
|
||||
from aiohttp import ClientSession
|
||||
from bs4 import BeautifulSoup
|
||||
from urllib.parse import quote
|
||||
from typing import Generator
|
||||
|
||||
from ..create_images import CreateImagesProvider
|
||||
from ..helper import get_cookies, get_event_loop
|
||||
from ...webdriver import WebDriver, get_driver_cookies, get_browser
|
||||
from ...base_provider import ProviderType
|
||||
|
||||
BING_URL = "https://www.bing.com"
|
||||
|
||||
def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None:
|
||||
driver.get(f"{BING_URL}/")
|
||||
value = driver.get_cookie("_U")
|
||||
if value:
|
||||
return
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError("Timeout error")
|
||||
value = driver.get_cookie("_U")
|
||||
if value:
|
||||
return
|
||||
time.sleep(0.5)
|
||||
|
||||
def create_session(cookies: dict) -> ClientSession:
|
||||
headers = {
|
||||
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
||||
"accept-encoding": "gzip, deflate, br",
|
||||
"accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
|
||||
"content-type": "application/x-www-form-urlencoded",
|
||||
"referrer-policy": "origin-when-cross-origin",
|
||||
"referrer": "https://www.bing.com/images/create/",
|
||||
"origin": "https://www.bing.com",
|
||||
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
|
||||
"sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
|
||||
"sec-ch-ua-mobile": "?0",
|
||||
"sec-fetch-dest": "document",
|
||||
"sec-fetch-mode": "navigate",
|
||||
"sec-fetch-site": "same-origin",
|
||||
"sec-fetch-user": "?1",
|
||||
"upgrade-insecure-requests": "1",
|
||||
}
|
||||
if cookies:
|
||||
headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
|
||||
return ClientSession(headers=headers)
|
||||
|
||||
async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 300) -> list:
|
||||
url_encoded_prompt = quote(prompt)
|
||||
payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
|
||||
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
|
||||
async with session.post(
|
||||
url,
|
||||
allow_redirects=False,
|
||||
data=payload,
|
||||
timeout=timeout,
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
errors = [
|
||||
"this prompt is being reviewed",
|
||||
"this prompt has been blocked",
|
||||
"we're working hard to offer image creator in more languages"
|
||||
]
|
||||
text = (await response.text()).lower()
|
||||
for error in errors:
|
||||
if error in text:
|
||||
raise RuntimeError(f"Create images failed: {error}")
|
||||
if response.status != 302:
|
||||
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
|
||||
async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response:
|
||||
if response.status != 302:
|
||||
raise RuntimeError(f"Create images failed. Status Code: {response.status}")
|
||||
|
||||
redirect_url = response.headers["Location"].replace("&nfy=1", "")
|
||||
redirect_url = f"{BING_URL}{redirect_url}"
|
||||
request_id = redirect_url.split("id=")[1]
|
||||
async with session.get(redirect_url) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError(f"Timeout error after {timeout} seconds")
|
||||
async with session.get(polling_url) as response:
|
||||
if response.status != 200:
|
||||
raise RuntimeError(f"Polling images faild. Status Code: {response.status}")
|
||||
text = await response.text()
|
||||
if not text:
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
break
|
||||
error = None
|
||||
try:
|
||||
error = json.loads(text).get("errorMessage")
|
||||
except:
|
||||
pass
|
||||
if error == "Pending":
|
||||
raise RuntimeError("Prompt is been blocked")
|
||||
elif error:
|
||||
raise RuntimeError(error)
|
||||
return read_images(text)
|
||||
|
||||
def read_images(text: str) -> list:
|
||||
html_soup = BeautifulSoup(text, "html.parser")
|
||||
tags = html_soup.find_all("img")
|
||||
image_links = [img["src"] for img in tags if "mimg" in img["class"]]
|
||||
images = [link.split("?w=")[0] for link in image_links]
|
||||
bad_images = [
|
||||
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
|
||||
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
|
||||
]
|
||||
if any(im in bad_images for im in images):
|
||||
raise RuntimeError("Bad images found")
|
||||
if not images:
|
||||
raise RuntimeError("No images found")
|
||||
return images
|
||||
|
||||
def format_images_markdown(images: list, prompt: str) -> str:
|
||||
images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)]
|
||||
images = "\n".join(images)
|
||||
start_flag = "<!-- generated images start -->\n"
|
||||
end_flag = "<!-- generated images end -->\n"
|
||||
return f"\n{start_flag}{images}\n{end_flag}\n"
|
||||
|
||||
async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str:
|
||||
session = create_session(cookies)
|
||||
try:
|
||||
images = await create_images(session, prompt, proxy)
|
||||
return format_images_markdown(images, prompt)
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
def get_cookies_from_browser(proxy: str = None) -> dict:
|
||||
driver = get_browser(proxy=proxy)
|
||||
try:
|
||||
wait_for_login(driver)
|
||||
return get_driver_cookies(driver)
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
def create_completion(prompt: str, cookies: dict = None, proxy: str = None) -> Generator:
|
||||
loop = get_event_loop()
|
||||
if not cookies:
|
||||
cookies = get_cookies(".bing.com")
|
||||
if "_U" not in cookies:
|
||||
login_url = os.environ.get("G4F_LOGIN_URL")
|
||||
if login_url:
|
||||
yield f"Please login: [Bing]({login_url})\n\n"
|
||||
cookies = get_cookies_from_browser(proxy)
|
||||
yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy))
|
||||
|
||||
async def create_async(prompt: str, cookies: dict = None, proxy: str = None) -> str:
|
||||
if not cookies:
|
||||
cookies = get_cookies(".bing.com")
|
||||
if "_U" not in cookies:
|
||||
cookies = get_cookies_from_browser(proxy)
|
||||
return await create_images_markdown(cookies, prompt, proxy)
|
||||
|
||||
def patch_provider(provider: ProviderType) -> CreateImagesProvider:
|
||||
return CreateImagesProvider(provider, create_completion, create_async)
|
@ -0,0 +1,162 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import string
|
||||
import random
|
||||
import json
|
||||
import re
|
||||
import io
|
||||
import base64
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from aiohttp import ClientSession
|
||||
|
||||
async def upload_image(
|
||||
session: ClientSession,
|
||||
image: str,
|
||||
tone: str,
|
||||
proxy: str = None
|
||||
):
|
||||
try:
|
||||
image_config = {
|
||||
"maxImagePixels": 360000,
|
||||
"imageCompressionRate": 0.7,
|
||||
"enableFaceBlurDebug": 0,
|
||||
}
|
||||
is_data_uri_an_image(image)
|
||||
img_binary_data = extract_data_uri(image)
|
||||
is_accepted_format(img_binary_data)
|
||||
img = Image.open(io.BytesIO(img_binary_data))
|
||||
width, height = img.size
|
||||
max_image_pixels = image_config['maxImagePixels']
|
||||
if max_image_pixels / (width * height) < 1:
|
||||
new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
|
||||
new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
|
||||
else:
|
||||
new_width = width
|
||||
new_height = height
|
||||
try:
|
||||
orientation = get_orientation(img)
|
||||
except Exception:
|
||||
orientation = None
|
||||
new_img = process_image(orientation, img, new_width, new_height)
|
||||
new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate'])
|
||||
data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
|
||||
headers = session.headers.copy()
|
||||
headers["content-type"] = f'multipart/form-data; boundary={boundary}'
|
||||
headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
|
||||
headers["origin"] = 'https://www.bing.com'
|
||||
async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
|
||||
if response.status != 200:
|
||||
raise RuntimeError("Failed to upload image.")
|
||||
image_info = await response.json()
|
||||
if not image_info.get('blobId'):
|
||||
raise RuntimeError("Failed to parse image info.")
|
||||
result = {'bcid': image_info.get('blobId', "")}
|
||||
result['blurredBcid'] = image_info.get('processedBlobId', "")
|
||||
if result['blurredBcid'] != "":
|
||||
result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
|
||||
elif result['bcid'] != "":
|
||||
result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
|
||||
result['originalImageUrl'] = (
|
||||
"https://www.bing.com/images/blob?bcid="
|
||||
+ result['blurredBcid']
|
||||
if image_config["enableFaceBlurDebug"]
|
||||
else "https://www.bing.com/images/blob?bcid="
|
||||
+ result['bcid']
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Upload image failed: {e}")
|
||||
|
||||
|
||||
def build_image_upload_api_payload(image_bin: str, tone: str):
|
||||
payload = {
|
||||
'invokedSkills': ["ImageById"],
|
||||
'subscriptionId': "Bing.Chat.Multimodal",
|
||||
'invokedSkillsRequestData': {
|
||||
'enableFaceBlur': True
|
||||
},
|
||||
'convoData': {
|
||||
'convoid': "",
|
||||
'convotone': tone
|
||||
}
|
||||
}
|
||||
knowledge_request = {
|
||||
'imageInfo': {},
|
||||
'knowledgeRequest': payload
|
||||
}
|
||||
boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
||||
data = (
|
||||
f'--{boundary}'
|
||||
+ '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
|
||||
+ json.dumps(knowledge_request, ensure_ascii=False)
|
||||
+ "\r\n--"
|
||||
+ boundary
|
||||
+ '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
|
||||
+ image_bin
|
||||
+ "\r\n--"
|
||||
+ boundary
|
||||
+ "--\r\n"
|
||||
)
|
||||
return data, boundary
|
||||
|
||||
def is_data_uri_an_image(data_uri: str):
|
||||
# Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
|
||||
if not re.match(r'data:image/(\w+);base64,', data_uri):
|
||||
raise ValueError("Invalid data URI image.")
|
||||
# Extract the image format from the data URI
|
||||
image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
|
||||
# Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
|
||||
if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
|
||||
raise ValueError("Invalid image format (from mime file type).")
|
||||
|
||||
def is_accepted_format(binary_data: bytes) -> bool:
|
||||
if binary_data.startswith(b'\xFF\xD8\xFF'):
|
||||
pass # It's a JPEG image
|
||||
elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
|
||||
pass # It's a PNG image
|
||||
elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
|
||||
pass # It's a GIF image
|
||||
elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
|
||||
pass # It's a JPEG image
|
||||
elif binary_data.startswith(b'\xFF\xD8'):
|
||||
pass # It's a JPEG image
|
||||
elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
|
||||
pass # It's a WebP image
|
||||
else:
|
||||
raise ValueError("Invalid image format (from magic code).")
|
||||
|
||||
def extract_data_uri(data_uri: str) -> bytes:
|
||||
data = data_uri.split(",")[1]
|
||||
data = base64.b64decode(data)
|
||||
return data
|
||||
|
||||
def get_orientation(data: bytes) -> int:
|
||||
if data[:2] != b'\xFF\xD8':
|
||||
raise Exception('NotJpeg')
|
||||
with Image.open(data) as img:
|
||||
exif_data = img._getexif()
|
||||
if exif_data is not None:
|
||||
orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
|
||||
if orientation is not None:
|
||||
return orientation
|
||||
|
||||
def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
|
||||
# Initialize the canvas
|
||||
new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
|
||||
if orientation:
|
||||
if orientation > 4:
|
||||
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
if orientation in [3, 4]:
|
||||
img = img.transpose(Image.ROTATE_180)
|
||||
if orientation in [5, 6]:
|
||||
img = img.transpose(Image.ROTATE_270)
|
||||
if orientation in [7, 8]:
|
||||
img = img.transpose(Image.ROTATE_90)
|
||||
new_img.paste(img, (0, 0))
|
||||
return new_img
|
||||
|
||||
def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
|
||||
output_buffer = io.BytesIO()
|
||||
image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
|
||||
return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
|
@ -0,0 +1,84 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import asyncio
|
||||
from ..typing import CreateResult, Messages
|
||||
from ..base_provider import BaseProvider, ProviderType
|
||||
|
||||
system_message = """
|
||||
You can generate custom images with the DALL-E 3 image generator.
|
||||
To generate a image with a prompt, do this:
|
||||
<img data-prompt=\"keywords for the image\">
|
||||
Don't use images with data uri. It is important to use a prompt instead.
|
||||
<img data-prompt=\"image caption\">
|
||||
"""
|
||||
|
||||
class CreateImagesProvider(BaseProvider):
|
||||
def __init__(
|
||||
self,
|
||||
provider: ProviderType,
|
||||
create_images: callable,
|
||||
create_async: callable,
|
||||
system_message: str = system_message,
|
||||
include_placeholder: bool = True
|
||||
) -> None:
|
||||
self.provider = provider
|
||||
self.create_images = create_images
|
||||
self.create_images_async = create_async
|
||||
self.system_message = system_message
|
||||
self.__name__ = provider.__name__
|
||||
self.working = provider.working
|
||||
self.supports_stream = provider.supports_stream
|
||||
self.include_placeholder = include_placeholder
|
||||
if hasattr(provider, "url"):
|
||||
self.url = provider.url
|
||||
|
||||
def create_completion(
|
||||
self,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
stream: bool = False,
|
||||
**kwargs
|
||||
) -> CreateResult:
|
||||
messages.insert(0, {"role": "system", "content": self.system_message})
|
||||
buffer = ""
|
||||
for chunk in self.provider.create_completion(model, messages, stream, **kwargs):
|
||||
if buffer or "<" in chunk:
|
||||
buffer += chunk
|
||||
if ">" in buffer:
|
||||
match = re.search(r'<img data-prompt="(.*?)">', buffer)
|
||||
if match:
|
||||
placeholder, prompt = match.group(0), match.group(1)
|
||||
start, append = buffer.split(placeholder, 1)
|
||||
if start:
|
||||
yield start
|
||||
if self.include_placeholder:
|
||||
yield placeholder
|
||||
yield from self.create_images(prompt)
|
||||
if append:
|
||||
yield append
|
||||
else:
|
||||
yield buffer
|
||||
buffer = ""
|
||||
else:
|
||||
yield chunk
|
||||
|
||||
async def create_async(
|
||||
self,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
**kwargs
|
||||
) -> str:
|
||||
messages.insert(0, {"role": "system", "content": self.system_message})
|
||||
response = await self.provider.create_async(model, messages, **kwargs)
|
||||
matches = re.findall(r'(<img data-prompt="(.*?)">)', result)
|
||||
results = []
|
||||
for _, prompt in matches:
|
||||
results.append(self.create_images_async(prompt))
|
||||
results = await asyncio.gather(*results)
|
||||
for idx, result in enumerate(results):
|
||||
placeholder = matches[idx][0]
|
||||
if self.include_placeholder:
|
||||
result = placeholder + result
|
||||
response = response.replace(placeholder, result)
|
||||
return result
|
Loading…
Reference in New Issue