mysql数据导入elasticsearch

Ikko Lv4

首先创建索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
curl -XPUT -u elastic:密码 IP:9200/索引名 -H 'Content-Type: application/json' -d '
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"policy_title": {
"type": "text",
"analyzer": "ik_max_word"
},
"policy_body": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}'

python创建索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pymysql
from elasticsearch import Elasticsearch
import json

import traceback
from elasticsearch import helpers
es = Elasticsearch(hosts=[
''])
index_name = 'policyend'
es.indices.create(
index=index_name,
body={
'settings': {
'number_of_shards': 3,
'number_of_replicas': 1
},
'mappings': {
'properties': {
'policy_title': {
'type': 'text',
'analyzer': 'ik_max_word'
},
'policy_body': {
'type': 'text',
'analyzer': 'ik_max_word'
}
}
}
}
)

然后使用python导入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import pymysql
from elasticsearch import Elasticsearch


import traceback
from elasticsearch import helpers
# 连接MySQL数据库
cnx = pymysql.connect(user='root', password='', host='localhost', database='search')
cursor = cnx.cursor()
print('连接mysql')

# 执行查询语句,获取数据
query = "SELECT * FROM search_policy"
cursor.execute(query)
res0 = cursor.fetchall()
list1 = []
for row in cursor.description:
list1.append(row[0])
tuple(list1)

# 连接Elasticsearch
es = Elasticsearch(hosts=['https://elastic:密码@IP:9200'])
print('连接es')

actions = []
tb_name = "tb_demo"
cnt = 0
for r in res0:
cnt += 1

data = dict(zip(list1, r))

print(cnt)

try:
action = {"_index":'policyy', "_source":data}
actions.append(action)

# 每1000个一组批量导入
if len(actions) == 1000:
print("execute 1000 action")
success,errors = helpers.bulk(es, actions, raise_on_error = True)
actions = []
except:
print('traceback.format_exc():\n%s' % traceback.format_exc())
if len(actions) > 0:
try:
success,errors = helpers.bulk(es, actions, raise_on_error = True)
except Exception as e:
print(e)

创建搜索建议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import pymysql
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

# 连接MySQL数据库
cnx = pymysql.connect(user='root', password='YDXcgyz123.',
host='localhost', database='search')
cursor = cnx.cursor()
print('连接mysql')

# 执行查询语句,获取数据
query = "SELECT * FROM search_policy"
cursor.execute(query)
res0 = cursor.fetchall()

# 连接Elasticsearch
es = Elasticsearch(hosts=[
''])

# 创建新的索引并定义映射,包含一个 "policy_title_suggest" 字段作为 Completion Suggester
index_name = 'policy_title_suggest_index'
mapping = {
"mappings": {
"properties": {
"policy_title_suggest": {
"type": "completion", # Completion Suggester 字段类型
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word",
"preserve_position_increments": False,
"preserve_separators": False,
"max_input_length": 50,
}
}
}
}
es.indices.delete(index='policy_title_suggest_index', ignore=[400, 404])

es.indices.create(index=index_name, body=mapping)

# 将 MySQL 查询结果导入到 Elasticsearch 索引中,每次批量导入 5000 条文档
new_docs = []
count = 0
for row in res0:
if row[1]: # 过滤掉空值的文档
doc = {
"_index": index_name,
"_id": row[0], # 假设第一列为 ID 字段作为文档的唯一标识符
"policy_title_suggest": {
"input": [row[1]], # 将第二列数据作为输入
"weight": 1,
}
}
new_docs.append(doc)
count += 1
if count % 5000 == 0:
bulk(es, new_docs)
new_docs = []

# 导入剩余的文档
if new_docs:
bulk(es, new_docs)

# 刷新索引
es.indices.refresh(index=index_name)

搜索建议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from elasticsearch import Elasticsearch

# 连接 Elasticsearch
es = Elasticsearch(hosts=[
''])

# 定义搜索关键词
search_word = '优化'

# 构建搜索建议的 Elasticsearch 查询体
suggest_body = {
"suggest": {
"suggestion": {
"prefix": search_word,
"completion": {
"field": "policy_title_suggest", # 根据实际的字段名称进行修改
"skip_duplicates": True,
"fuzzy": {
"fuzziness": "AUTO"
}
}
}
}
}

# 发送 Elasticsearch 查询请求
response = es.search(index='policy_title_suggest_index',
body=suggest_body) # 根据实际的索引名称进行修改

# 提取搜索建议数据
suggestions = response['suggest']['suggestion'][0]['options']
suggestion_terms = [suggestion['text'] for suggestion in suggestions]
print(suggestion_terms)
  • Title: mysql数据导入elasticsearch
  • Author: Ikko
  • Created at : 2023-04-04 14:22:39
  • Updated at : 2026-04-25 22:00:05
  • Link: http://ikko-debug.github.io/2023/04/04/mysql数据导入elasticsearch/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments
On this page
mysql数据导入elasticsearch