目录
aws的上传、删除s3文件以及图像识别文字功能
准备工作
安装aws cli
根据自己的操作系统,下载相应的安装包安装。安装过程很简单,在此不再赘述。
在安装完成之后,运行以下两个命令来验证AWS CLI是否安装成功。参考以下示例,在MacOS上打开Terminal程序。如果是Windows系统,打开cmd。
- where aws / which aws 查看AWS CLI安装路径
- aws --version 查看AWS CLI版本
zonghan@MacBook-Pro ~ % aws --version
aws-cli/2.0.30 Python/3.7.4 Darwin/21.6.0 botocore/2.0.0dev34
zonghan@MacBook-Pro ~ % which aws
/usr/local/bin/aws
初始化配置AWS CLI
在使用AWS CLI前,可使用aws configure命令,完成初始化配置。
zonghan@MacBook-Pro ~ % aws configure
AWS Access Key ID [None]: AKIA3GRZL6WIQEXAMPLE
AWS Secret Access Key [None]: k+ci5r+hAcM3x61w1example
Default region name [None]: ap-east-1
Default output format [None]: json
AWS Access Key ID 及AWS Secret Access Key可在AWS管理控制台获取,AWS CLI将会使用此信息作为用户名、密码连接AWS服务。
点击AWS管理控制台右上角的用户名 --> 选择Security Credentials
- 点击Create New Access Key以创建一对Access Key ID 及Secret Access Key,并保存(且仅能在创建时保存)
- Default region name,用以指定要连接的AWS 区域代码。每个AWS区域对应的代码可通过 此链接查找。
- Default output format,用以指定命令行输出内容的格式,默认使用JSON作为所有输出的格式。也可以使用以下任一格式: JSON(JavaScript Object Notation) YAML: 仅在 AWS CLI v2 版本中可用 Text Table
更多详细的配置请看该文章
s3存储桶开通
该电脑配置的认证用户在aws的s3上有权限访问一个s3的存储桶,这个一般都是管理员给你开通
图像识别文字功能开通
该电脑配置的认证用户在aws的Amazon Textract的权限,这个一般都是管理员给你开通
aws的sdk
import boto3
from botocore.exceptions import ClientError, BotoCoreError
安装上述boto3的模块,一般会同时安装botocore模块
上传文件
方法一
使用upload_file方法来上传文件
import logging
import boto3
from botocore.exceptions import ClientError
import os
def upload_file(file_path, bucket, file_name=None):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name. If not specified then file_name is used
:return: True if file was uploaded, else False
"""
# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = os.path.basename(file_name)
# Upload the file
s3_client = boto3.client('s3')
# s3 = boto3.resource('s3')
try:
response = s3_client.upload_file(file_path, bucket, file_name)
# response = s3.Bucket(bucket).upload_file(file_name, object_name)
except ClientError as e:
logging.error(e)
return False
return True
方法二
使用PutObject来上传文件
import logging
import os
import boto3
from botocore.exceptions import ClientError, BotoCoreError
from django.conf import settings
from celery import shared_task
logger = logging.getLogger(__name__)
def upload_file_to_aws(file_path, bucket, file_name=None):
"""Upload a file to an S3 bucket
:param file_path: File to upload
:param file_name: S3 object name. If not specified then file_path is used
:return: True if file was uploaded, else False
"""
# If S3 object_name was not specified, use file_name
if file_name is None:
file_name = os.path.basename(file_path)
# Upload the file
s3 = boto3.resource('s3')
try:
with open(file_path, 'rb') as f:
data = f.read()
obj = s3.Object(bucket, file_name)
obj.put(
Body=data
)
except BotoCoreError as e:
logger.info(e)
return False
return True
删除文件
def delete_aws_file(file_name, bucket):
try:
s3_client = boto3.client("s3")
s3_client.delete_object(Bucket=bucket, Key=file_name)
except Exception as e:
logger.info(e)
图像识别文字
识别发票、账单这种key,value的形式
def get_labels_and_values(result, field):
if "LabelDetection" in field:
key = field.get("LabelDetection")["Text"]
value = field.get("ValueDetection")["Text"]
if key and value:
if key.endswith(":"):
key = key[:-1]
result.append({key: value})
def process_text_detection(bucket, document):
try:
client = boto3.client("textract", region_name="ap-south-1")
response = client.analyze_expense(
Document={"S3Object": {"Bucket": bucket, "Name": document}}
)
except Exception as e:
logger.info(e)
raise "An unknown error occurred on the aws service"
result = {}
for expense_doc in response["ExpenseDocuments"]:
for line_item_group in expense_doc["LineItemGroups"]:
for line_items in line_item_group["LineItems"]:
for expense_fields in line_items["LineItemExpenseFields"]:
get_labels_and_values(result, expense_fields)
for summary_field in expense_doc["SummaryFields"]:
get_labels_and_values(result, summary_field)
return result
def get_extract_info(bucket, document):
return process_text_detection(bucket, document)
单纯的识别文字
#Analyzes text in a document stored in an S3 bucket. Display polygon box around text and angled text
import boto3
import io
from io import BytesIO
import sys
import math
from PIL import Image, ImageDraw, ImageFont
def ShowBoundingBox(draw,box,width,height,boxColor):
left = width * box['Left']
top = height * box['Top']
draw.rectangle([left,top, left + (width * box['Width']), top +(height * box['Height'])],outline=boxColor)
def ShowSelectedElement(draw,box,width,height,boxColor):
left = width * box['Left']
top = height * box['Top']
draw.rectangle([left,top, left + (width * box['Width']), top +(height * box['Height'])],fill=boxColor)
# Displays information about a block returned by text detection and text analysis
def DisplayBlockInformation(block):
print('Id: {}'.format(block['Id']))
if 'Text' in block:
print(' Detected: ' + block['Text'])
print(' Type: ' + block['BlockType'])
if 'Confidence' in block:
print(' Confidence: ' + "{:.2f}".format(block['Confidence']) + "%")
if block['BlockType'] == 'CELL':
print(" Cell information")
print(" Column:" + str(block['ColumnIndex']))
print(" Row:" + str(block['RowIndex']))
print(" Column Span:" + str(block['ColumnSpan']))
print(" RowSpan:" + str(block['ColumnSpan']))
if 'Relationships' in block:
print(' Relationships: {}'.format(block['Relationships']))
print(' Geometry: ')
print(' Bounding Box: {}'.format(block['Geometry']['BoundingBox']))
print(' Polygon: {}'.format(block['Geometry']['Polygon']))
if block['BlockType'] == "KEY_VALUE_SET":
print (' Entity Type: ' + block['EntityTypes'][0])
if block['BlockType'] == 'SELECTION_ELEMENT':
print(' Selection element detected: ', end='')
if block['SelectionStatus'] =='SELECTED':
print('Selected')
else:
print('Not selected')
if 'Page' in block:
print('Page: ' + block['Page'])
print()
def process_text_analysis(bucket, document):
#Get the document from S3
s3_connection = boto3.resource('s3')
s3_object = s3_connection.Object(bucket,document)
s3_response = s3_object.get()
stream = io.BytesIO(s3_response['Body'].read())
image=Image.open(stream)
# Analyze the document
client = boto3.client('textract')
image_binary = stream.getvalue()
response = client.analyze_document(Document={'Bytes': image_binary},
FeatureTypes=["TABLES", "FORMS"])
### Alternatively, process using S3 object ###
#response = client.analyze_document(
# Document={'S3Object': {'Bucket': bucket, 'Name': document}},
# FeatureTypes=["TABLES", "FORMS"])
### To use a local file ###
# with open("pathToFile", 'rb') as img_file:
### To display image using PIL ###
# image = Image.open()
### Read bytes ###
# img_bytes = img_file.read()
# response = client.analyze_document(Document={'Bytes': img_bytes}, FeatureTypes=["TABLES", "FORMS"])
#Get the text blocks
blocks=response['Blocks']
width, height =image.size
draw = ImageDraw.Draw(image)
print ('Detected Document Text')
# Create image showing bounding box/polygon the detected lines/text
for block in blocks:
DisplayBlockInformation(block)
draw=ImageDraw.Draw(image)
if block['BlockType'] == "KEY_VALUE_SET":
if block['EntityTypes'][0] == "KEY":
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height,'red')
else:
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height,'green')
if block['BlockType'] == 'TABLE':
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height, 'blue')
if block['BlockType'] == 'CELL':
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height, 'yellow')
if block['BlockType'] == 'SELECTION_ELEMENT':
if block['SelectionStatus'] =='SELECTED':
ShowSelectedElement(draw, block['Geometry']['BoundingBox'],width,height, 'blue')
#uncomment to draw polygon for all Blocks
#points=[]
#for polygon in block['Geometry']['Polygon']:
# points.append((width * polygon['X'], height * polygon['Y']))
#draw.polygon((points), outline='blue')
# Display the image
image.show()
return len(blocks)
def main():
bucket = ''
document = ''
block_count=process_text_analysis(bucket,document)
print("Blocks detected: " + str(block_count))
if __name__ == "__main__":
main()
标签:
留言评论