实现思路
1.Python搭建Flask服务,编写ES脚本。
2.通过Java调用Python接口,完成对ES的插入操作。
环境配置
Elasticsearch 7.16.0文章来源:https://www.toymoban.com/news/detail-557899.html
具体代码实现
ESObject模板
import json
from flask import Flask, request, jsonify, Response
import jieba
import time
import hashlib
import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
server = Flask(__name__)
server.config["JSON_AS_ASCII"] = False
class ESObject:
def __init__(self, index_name, index_type, host='127.0.0.1', port=9300):
self.host = host
self.port = port
self.index_name = index_name
self.index_type = index_type
self.es_obj = self.connect_elasticsearch()
def set_index_name(self, index_name):
self.index_name = index_name
def set_index_type(self, index_type):
self.index_type = index_type
def connect_elasticsearch(self):
"""
创建连接
:return:
"""
_es = None
_es = Elasticsearch([{'host': self.host, 'port': self.port}], request_timeout=60, max_retries=3,
retry_on_timeout=True)
if _es.ping():
print('The connection is successful!')
else:
print("Error: ES could not connect!")
return _es
def create_index(self, settings):
"""
创建索引(数据库)
访问“http://localhost:9200/entities/_mappings”验证创建是否成功
:return:
"""
created = False
try:
if not self.es_obj.indices.exists(self.index_name):
# 参数ignore = 400在检查后不再需要,因为这可以防止错误地覆盖现有索引
self.es_obj.indices.create(index=self.index_name, ignore=400, body=settings)
print("Created Index")
created = True
except Exception as ex:
print(str(ex))
finally:
return created
def delete_index(self):
try:
if self.es_obj.indices.exists(self.index_name):
# 参数ignore 用来忽略 Index 不存在而删除失败导致程序中断的问题
self.es_obj.indices.delete(index=self.index_name, ignore=[400, 404])
print("Deleted Index")
except Exception as ex:
print(str(ex))
def store_record(self, record):
try:
outcome = self.es_obj.index(index=self.index_name, doc_type=self.index_type, body=record)
print(outcome['result'])
return outcome
except Exception as ex:
print("Error in indexing data")
print(str(ex))
def store_record_list(self, record_list):
for record in record_list:
self.store_record(record)
def bulk_index_data(self, record_list):
"""
批量插入
:param record_list:
:return:
"""
ACTIONS = []
i = 1
for record in record_list:
action = {
"_index": self.index_name,
"_type": self.index_type,
# "_id": i, # _id 可以默认生成,不赋值
"_source": record
}
i += 1
ACTIONS.append(action)
success, _ = bulk(self.es_obj, ACTIONS, index=self.index_name, raise_on_error=True)
print('Performed %d actions' % success)
def get_data_by_id(self, id):
res = self.es_obj.get(index=self.index_name, doc_type=self.index_type, id=id)
return res['hits']
def get_data_by_body(self, search):
# res = self.es_obj.search(index=self.index_name, doc_type=self.index_type, body=search)
res = self.es_obj.search(index=self.index_name, body=search)
return res['hits']
def update_data(self, id, body):
res = self.es_obj.update(index=self.index_name, doc_type=self.index_type, id=id, body=body)
return res
def delete_type_data(self):
query_object = {'query': {'match_all': {}}}
res = self.es_obj.delete_by_query(index_name=self.index_name, doc_type=self.index_type, body=query_object)
return res
def delect_index_data(self, id):
res = self.es_obj.delete(index=self.index_name, doc_type=self.index_type, id=id)
return res
def delete_by_query(self, query):
res = self.es_obj.delete_by_query(index=self.index_name, doc_type=self.index_type, body=query)
return res
def secret_key(length=30):
"""
Generate secret key from alpha and digit.
:param length: length of secret key.
:return: [length] long secret key.
"""
key = ''
while length:
key += random.choice(string.ascii_letters + string.digits)
length -= 1
return key
def hash_code(*args, **kwargs):
"""
Generate 64-strings(in hashlib.sha256()) hash code.
:param args: for any other position args packing.
:param kwargs: for any other key-word args packing.
:return: 64-strings long hash code.
"""
text = ''
if not args and not kwargs:
text += time.strftime("%Y%m%d%H%M%s")
if args:
for arg in args:
text += str(arg)
if kwargs:
for kwarg in kwargs:
text += str(kwargs[kwarg])
return hashlib.sha256(text.encode("utf-8")).hexdigest()
if __name__ == '__main__':
server.run(host='0.0.0.0', port=8660, debug=False)
插入函数及接口
def es_insert(datas):
try:
# 可以等同于 DataBase
index_name = "index_name"
# 可以等同于 Table
index_type = "_doc"
# ES对象
es = ESObject(index_name, index_type, 'localhost', 9200)
# # 删除索引
# es.delete_index()
#
# # 建立索引
# settings = {
# "settings": {
# "number_of_shards": 5,
# "number_of_replicas": 0
# },
# "index": {
# "refresh_interval": "20s"
# },
# "mappings": {
# index_type: {
# "dynamic": "strict",
# "properties": {
# "es_id": {
# "type": "text"
# }, "content": {
# "type": "text"
# }, "file_name": {
# "type": "text"
# }, "jieba_content": {
# "type": "text"
# }
# }
# }
# }
# }
# es.create_index(settings)
#插入操作
record_list = []
file_name = datas["filename"]
for i, data in enumerate(datas["contentList"]):
jieba_con_list = jieba.lcut(data)
jieba_con_str = str.join(" ", jieba_con_list)
es_id = secret_key()
# print(es_id)
# print(secret_key())
record = {"es_id": es_id, "content": data, "file_name": file_name, "jieba_content": jieba_con_str}
record_list.append(record)
if len(record_list) >= 10000:
start = time.time()
es.bulk_index_data(record_list)
print("Finished!")
end = time.time()
print(str(end - start))
record_list = []
print("record_list finished")
start = time.time()
es.bulk_index_data(record_list)
print("success finished!")
end = time.time()
print(str(end - start))
return "1"
except Exception as e:
print(e)
return "0"
@server.route("/es_insert", methods=['get', 'post'])
def question_regex():
if not request.data:
return "fail"
#获取接口调用传入的参数
data = json.loads(request.data.decode("utf-8"))
# print(data)
res_code = es_insert(data)
print(res_code)
return Response(str(res_code))
拓展思路
ESObject是一个模板,其中有很多其他的函数。通过Java调用,还可以实现很多操作,如删除、查询等。文章来源地址https://www.toymoban.com/news/detail-557899.html
拓展删除操作示例
def es_delete_by_id(p_file_name):
try:
# 等同于 DataBase
index_name = "index_name"
# 等同于 Table
index_type = "_doc"
es = ESObject(index_name, index_type, 'localhost', 9200)
ld_datas = es.get_data_by_body(10000)
ll_hits = ld_datas['hits']
ll_delete_list = []
for i, d in enumerate(ll_hits):
l_id = d['_id']
l_file_name = d['_source']['file_name']
if p_file_name == l_file_name:
es.delete_index_data(l_id)
ll_delete_list.append(l_file_name)
print(list(set(ll_delete_list)))
return "1"
except Exception as e:
print(e)
return "0"
@server.route("/es_delete", methods=['get', 'post'])
def question_regex():
if not request.data:
return "fail"
data = json.loads(request.data.decode("utf-8"))
print(data)
# filename = ''
# l_res_code = es_delete_by_id(filename)
l_delete_file_name = data["filename"]
l_res_code = es_delete_by_id(l_delete_file_name)
return Response(str(l_res_code))
到了这里,关于【Elasticsearch】使用Python完成对ES的插入操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!