【Elasticsearch】使用Python完成对ES的插入操作

这篇具有很好参考价值的文章主要介绍了【Elasticsearch】使用Python完成对ES的插入操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

实现思路

1.Python搭建Flask服务,编写ES脚本。
2.通过Java调用Python接口,完成对ES的插入操作。

环境配置

Elasticsearch 7.16.0

具体代码实现

ESObject模板

import json
from flask import Flask, request, jsonify, Response
import jieba
import time
import hashlib
import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

server = Flask(__name__)
server.config["JSON_AS_ASCII"] = False


class ESObject:
    def __init__(self, index_name, index_type, host='127.0.0.1', port=9300):
        self.host = host
        self.port = port
        self.index_name = index_name
        self.index_type = index_type

        self.es_obj = self.connect_elasticsearch()

    def set_index_name(self, index_name):
        self.index_name = index_name

    def set_index_type(self, index_type):
        self.index_type = index_type

    def connect_elasticsearch(self):
        """
        创建连接
        :return:
        """
        _es = None
        _es = Elasticsearch([{'host': self.host, 'port': self.port}], request_timeout=60, max_retries=3,
                            retry_on_timeout=True)
        if _es.ping():
            print('The connection is successful!')
        else:
            print("Error: ES could not connect!")
        return _es

    def create_index(self, settings):
        """
        创建索引(数据库)
        访问“http://localhost:9200/entities/_mappings”验证创建是否成功
        :return:
        """
        created = False
        try:
            if not self.es_obj.indices.exists(self.index_name):
                # 参数ignore = 400在检查后不再需要,因为这可以防止错误地覆盖现有索引
                self.es_obj.indices.create(index=self.index_name, ignore=400, body=settings)
                print("Created Index")
            created = True
        except Exception as ex:
            print(str(ex))
        finally:
            return created

    def delete_index(self):
        try:
            if self.es_obj.indices.exists(self.index_name):
                # 参数ignore 用来忽略 Index 不存在而删除失败导致程序中断的问题
                self.es_obj.indices.delete(index=self.index_name, ignore=[400, 404])
                print("Deleted Index")
        except Exception as ex:
            print(str(ex))

    def store_record(self, record):
        try:
            outcome = self.es_obj.index(index=self.index_name, doc_type=self.index_type, body=record)
            print(outcome['result'])
            return outcome
        except Exception as ex:
            print("Error in indexing data")
            print(str(ex))

    def store_record_list(self, record_list):
        for record in record_list:
            self.store_record(record)

    def bulk_index_data(self, record_list):
        """
        批量插入
        :param record_list:
        :return:
        """
        ACTIONS = []
        i = 1
        for record in record_list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                # "_id": i,  # _id 可以默认生成,不赋值
                "_source": record
            }
            i += 1
            ACTIONS.append(action)
        success, _ = bulk(self.es_obj, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)

    def get_data_by_id(self, id):
        res = self.es_obj.get(index=self.index_name, doc_type=self.index_type, id=id)
        return res['hits']

    def get_data_by_body(self, search):
        # res = self.es_obj.search(index=self.index_name, doc_type=self.index_type, body=search)
        res = self.es_obj.search(index=self.index_name, body=search)
        return res['hits']

    def update_data(self, id, body):
        res = self.es_obj.update(index=self.index_name, doc_type=self.index_type, id=id, body=body)
        return res

    def delete_type_data(self):
        query_object = {'query': {'match_all': {}}}
        res = self.es_obj.delete_by_query(index_name=self.index_name, doc_type=self.index_type, body=query_object)
        return res

    def delect_index_data(self, id):
        res = self.es_obj.delete(index=self.index_name, doc_type=self.index_type, id=id)
        return res

    def delete_by_query(self, query):
        res = self.es_obj.delete_by_query(index=self.index_name, doc_type=self.index_type, body=query)
        return res


def secret_key(length=30):
    """
    Generate secret key from alpha and digit.
    :param length: length of secret key.
    :return: [length] long secret key.
    """
    key = ''
    while length:
        key += random.choice(string.ascii_letters + string.digits)
        length -= 1
    return key


def hash_code(*args, **kwargs):
    """
    Generate 64-strings(in hashlib.sha256()) hash code.
    :param args: for any other position args packing.
    :param kwargs: for any other key-word args packing.
    :return: 64-strings long hash code.
    """
    text = ''
    if not args and not kwargs:
        text += time.strftime("%Y%m%d%H%M%s")
    if args:
        for arg in args:
            text += str(arg)
    if kwargs:
        for kwarg in kwargs:
            text += str(kwargs[kwarg])
    return hashlib.sha256(text.encode("utf-8")).hexdigest()


if __name__ == '__main__':
    server.run(host='0.0.0.0', port=8660, debug=False)

插入函数及接口

def es_insert(datas):
    try:
        # 可以等同于 DataBase
        index_name = "index_name"
        # 可以等同于 Table
        index_type = "_doc"
		# ES对象
        es = ESObject(index_name, index_type, 'localhost', 9200)
		
        # # 删除索引
        # es.delete_index()
        #
        # # 建立索引
        # settings = {
        #     "settings": {
        #         "number_of_shards": 5,
        #         "number_of_replicas": 0
        #     },
        #     "index": {
        #         "refresh_interval": "20s"
        #     },
        #     "mappings": {
        #         index_type: {
        #             "dynamic": "strict",
        #             "properties": {
        #                 "es_id": {
        #                     "type": "text"
        #                 }, "content": {
        #                     "type": "text"
        #                 }, "file_name": {
        #                     "type": "text"
        #                 }, "jieba_content": {
        #                     "type": "text"
        #                 }
        #             }
        #         }
        #     }
        # }
        # es.create_index(settings)
		#插入操作
        record_list = []
        file_name = datas["filename"]
        for i, data in enumerate(datas["contentList"]):
            jieba_con_list = jieba.lcut(data)
            jieba_con_str = str.join(" ", jieba_con_list)
            es_id = secret_key()
            # print(es_id)
            # print(secret_key())
            record = {"es_id": es_id, "content": data, "file_name": file_name, "jieba_content": jieba_con_str}
            record_list.append(record)
            if len(record_list) >= 10000:
                start = time.time()
                es.bulk_index_data(record_list)
                print("Finished!")
                end = time.time()
                print(str(end - start))
                record_list = []
        print("record_list finished")
        start = time.time()
        es.bulk_index_data(record_list)
        print("success finished!")
        end = time.time()
        print(str(end - start))
        return "1"
    except Exception as e:
        print(e)
        return "0"


@server.route("/es_insert", methods=['get', 'post'])
def question_regex():
    if not request.data:
        return "fail"
     #获取接口调用传入的参数
    data = json.loads(request.data.decode("utf-8"))
    # print(data)
    res_code = es_insert(data)
    print(res_code)
    return Response(str(res_code))

拓展思路

ESObject是一个模板,其中有很多其他的函数。通过Java调用,还可以实现很多操作,如删除、查询等。文章来源地址https://www.toymoban.com/news/detail-557899.html

拓展删除操作示例

def es_delete_by_id(p_file_name):
    try:
        # 等同于 DataBase
        index_name = "index_name"
        # 等同于 Table
        index_type = "_doc"
        es = ESObject(index_name, index_type, 'localhost', 9200)
        ld_datas = es.get_data_by_body(10000)

        ll_hits = ld_datas['hits']
        ll_delete_list = []
        for i, d in enumerate(ll_hits):
            l_id = d['_id']
            l_file_name = d['_source']['file_name']
            if p_file_name == l_file_name:
                es.delete_index_data(l_id)
                ll_delete_list.append(l_file_name)
        print(list(set(ll_delete_list)))
        return "1"
    except Exception as e:
        print(e)
        return "0"


@server.route("/es_delete", methods=['get', 'post'])
def question_regex():
    if not request.data:
        return "fail"
    data = json.loads(request.data.decode("utf-8"))
    print(data)

    # filename = ''
    # l_res_code = es_delete_by_id(filename)
    l_delete_file_name = data["filename"]
    l_res_code = es_delete_by_id(l_delete_file_name)
    return Response(str(l_res_code))

到了这里,关于【Elasticsearch】使用Python完成对ES的插入操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 项目中使用es(一):使用springboot操作elasticsearch

    项目中使用es(一):使用springboot操作elasticsearch

    写在前面 对于elasticsearch的搭建,前面写了一篇文章有简单描述如何搭建es,本次主要介绍如何在项目里使用,主要使用ElasticsearchRepository和ElasticsearchRestTemplate操作es。 搭建项目环境和选择合适版本 首先选择合适的项目组件版本,因为es版本和springboot版本有对应,如果不合适会

    2024年02月08日
    浏览(12)
  • Java使用Maven工程操作OpenGL ES绘制三角形和圆形;绘制完成后操作键盘控制然图形移动

    Java使用Maven工程操作OpenGL ES绘制三角形和圆形;绘制完成后操作键盘控制然图形移动

    PS:想快速看到效果的小伙伴,可以在引入依赖后,先跳到完整代码部分 第一步:依赖引入 第二步:创建类,引入需要的包,设置全局参数 1.创建类 2. 包引入 3. 全局参数 第三步:定义一个初始化方法 init() 1. GLFW 错误信息的回调函数 这样做,在发生 GLFW 错误时,错误信息将

    2024年02月08日
    浏览(12)
  • 使用postman和es插件操作elasticsearch API

    使用postman和es插件操作elasticsearch API

    本文介绍了使用postman和es浏览器插件操作elasticsearch API的常用方法 本文使用的es浏览器插件时edge下的elasticvue,可以在edge的应用商店直接搜索安装,相较于es-head,这个插件一直在维护更新,使用还是很方便的     查看索引 查看索引主要使用get方法,可以查看单个or多个索引,

    2024年02月07日
    浏览(10)
  • Python语言,ES(Elasticsearch)基础查询

    https://blog.csdn.net/y472360651/article/details/76652021 https://www.cnblogs.com/bainianminguo/articles/12763099.html

    2024年02月11日
    浏览(11)
  • ES批量上传数据 - Python操作ES

    ES批量上传数据 - Python操作ES

    2024年02月11日
    浏览(7)
  • Springboot 整合 Elasticsearch(五):使用RestHighLevelClient操作ES ②

    Springboot 整合 Elasticsearch(五):使用RestHighLevelClient操作ES ②

    📁 前情提要: Springboot 整合 Elasticsearch(三):使用RestHighLevelClient操作ES ① 目录  一、Springboot 整合 Elasticsearch 1、RestHighLevelClient API介绍 1.1、全查询 分页 排序 1.2、单条件查询 1.2.1、termQuery 1.2.2、matchQuery 1.2.3、短语检索 1.3、组合查询 1.4、范围查询 1.5、模糊查询 1.6、分组

    2024年04月11日
    浏览(16)
  • Elasticsearch、Kibana以及Java操作ES 的快速使用

    Elasticsearch、Kibana以及Java操作ES 的快速使用

      创建docker自定义网络 docker自定义网络可以使得 容器之间使用容器名网络互连 ,默认的网络不会有这功能。 一定要配置自定义网络,并将两个容器同时加到网络中,否则下面的 http://es:9200 会无法访问到es   启动elastic search、kibana容器 启动 elastic search容器 访问 http://192.168

    2024年02月09日
    浏览(15)
  • Elasticsearch 7.X SpringBoot 使用 ElasticsearchRestTemplate 操作 ES

    Elasticsearch 7.X SpringBoot 使用 ElasticsearchRestTemplate 操作 ES

    前面学习了es rest接口对es进行操作的方式,并且还学习了es的分片及扩容,有讲解了几种常见的分词器,喜欢的小伙伴可以看下本专栏的其他文章,本篇主要将 在 SpringBoot 中使用 ElasticsearchRestTemplate 对ES进行操作。 对于SpringBoot对ES的操作早在以前我就写过一篇文章,但那时基

    2023年04月09日
    浏览(8)
  • Elasticsearch 基础操作与 ES-head 插件的使用

    Elasticsearch 基础操作与 ES-head 插件的使用

    Elasticsearch 安装请参考 搭建ELK日志管理平台 - - 2 ElasticSearch部署 主节点 :默认配置是1个分片1个副本 主数据分片 : 数据在分片中被分组存储,例如如果分片是3个的话,存入数据 hello 可能被拆分存储在这几个分片之中 副本数据分片 : 对数据分片的拷贝 集群健康值 : 黄色表示

    2023年04月16日
    浏览(12)
  • Python连接es笔记三之es更新操作

    Python连接es笔记三之es更新操作

    本文首发于公众号:Hunter后端 原文链接:Python连接es笔记三之es更新操作 这一篇笔记介绍如何使用 Python 对数据进行更新操作。 对于 es 的更新的操作,不用到 Search() 方法,而是直接使用 es 的连接加上相应的函数来操作,本篇笔记目录如下: 获取连接 update() update_by_query() 批

    2024年02月07日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包