ElasticSearch基础
es安装
使用docker安装es
拉取镜像
1 | docker pull elasticsearch:7.12.0 |
创建docker容器挂在的目录: (windows手动创建)
1 | # linux的命令 |
配置文件
1 | echo "http.host: 0.0.0.0" > /opt/es/config/elasticsearch.yml |
创建容器
1 | # linux |
访问ip:9200能看到东西
图形化界面
浏览器下载 Multi Elasticsearch Head es插件
Go里面链接ES
这里用ES7演示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package core
import (
"fmt"
"github.com/olivere/elastic/v7")
func Es() {
client, err := elastic.NewClient(
elastic.SetURL("http://127.0.0.1:9200"),
elastic.SetSniff(false),
elastic.SetBasicAuth("", ""),
) if err != nil {
fmt.Printf("Error: %v", err)
return
}
fmt.Printf("Connected to Elasticsearch %+v", client)
}
关于SetSniff
- Sniff = 客户端启动后,主动去 ES 集群问一圈: “你们集群里到底有哪些节点?我以后该连谁?”
ES 集群真实情况
一个 ES 集群通常是这样的:1
2
3
4
5[Client]
|
| → 192.168.1.10:9200(master)
| → 192.168.1.11:9200(data)
| → 192.168.1.12:9200(data)`
但你在代码里只配置了一个地址:
http://localhost:9200
那客户端怎么知道 还有别的节点?
👉 这就靠 Sniff。
当在代码链接里写:1
client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"), elastic.SetSniff(true))
启动后会发生:
1️. 客户端请求:
GET /_nodes/http
2️. ES 返回集群中所有节点的地址:1
2
3
4
5
6
7
8
9
10
11
12
13
14{
"nodes": {
"node1": {
"http": {
"publish_address": "10.0.0.1:9200"
}
},
"node2": {
"http": {
"publish_address": "10.0.0.2:9200"
}
}
}
}
3️. 客户端:
自动把所有节点加入连接池
后续请求 负载均衡
节点挂了会自动剔除
这是一个“客户端级别的服务发现 + 负载均衡”
必须关闭
在 Docker、K8s、云 ES、反向代理环境下必须关闭,否则客户端会尝试连接不可达的内部地址。
例如部署在docker的127.0.0.1:9200, 使用sniff后会拿到http:127.0.0.1,这时候宿主机和其他服务访问不到,会报错
![[{19E41DF3-51CE-499C-B2DE-FA799E611ECD}.png]]
ES设置密码
不需要的密码情况:
- 服务器自己使用,云服务器使用ES,但是9200, 9300端口没有对外开放
对外提供服务: - es提供对外服务时候
ES索引操作
type 选择速查表
| 场景 | type | 用法时机 | 实例 | ||||
|---|---|---|---|---|---|---|---|
| 全文搜索 | text | ||||||
| 精确匹配 | keyword | ||||||
| 搜索 + 排序 | text + keyword | ||||||
| 主键 ID | long / keyword | ||||||
| 金额 | scaled_float | ||||||
| 时间 | date | 不要写错 ,不要写错 format,不要写错 format,不要写错 format,不要写错 format, 不要。。。不要写错 format | `”create_at”: { “type”: “date”, “format”: “yyyy-MM-dd HH:mm:ss\ |
\ | strict_date_optional_time\ | \ | epoch_millis” }` |
| 枚举 | keyword | ||||||
| 布尔 | boolean | ||||||
| 对象数组 | nested | "chapters": [ { "id": 1, "title": "第一章" }, { "id": 2, "title": "第二章" } ] 查询慢- 占用更多内存 |
|||||
| 普通对象 | object | ||||||
| 自动补全(搜索联想) |
completion | - 自动补全 - 输入提示 |
|||||
| 语义搜索 | dense_vector | ||||||
| ip | 特殊类型 |
能不用 nested 就不用
能拆 document 就不嵌套
全文 = text,过滤 = keyword
ID 永远不要用 text
中文必须考虑 analyzer
创建索引
实操:
1 | package models |
create:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30package index
import (
"context"
"dailycode/models" "dailycode/resource" "errors" "fmt")
func CreateIndex(ctx context.Context, index string) error {
ck, err := Exist_index(ctx, index)
if err != nil {
fmt.Printf("%v", err)
return errors.New("查询索引失败")
}
if ck {
fmt.Println("索引已存在")
return nil
}
idx, err := resource.EsClient.
CreateIndex(index).BodyString(models.User{}.Mapping()).Do(ctx)
if err != nil || !idx.Acknowledged {
fmt.Printf("%v", err)
return errors.New("创建索引失败")
}
return nil
}
func Exist_index(ctx context.Context, index string) (exist bool, err error) {
return resource.EsClient.IndexExists(index).Do(ctx)
}
查询索引:浏览器访问:
http://127.0.0.1:9200/user/_mapping
本质请求了查询接口
删除
1 | package index |
更新
1 | package index |
增删改查文档
- 增加:
1
2
3
4
5
6
7
8func CreateDoc(ctx context.Context, user models.User, index string) (indexId string, err error) {
resp, err := resource.EsClient.Index().Index(index).BodyJson(user).Do(ctx)
if err != nil {
return "", err
}
fmt.Printf("%#v", resp)
return resp.Id, nil
}
1 | elastic.IndexResponse |
如果是mapping里面没有,es会自己创建新的字段,应该是es自己实现上的问题
- 删除
根据ID删除1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package docs
import (
"context"
index2 "dailycode/index"
"dailycode/models" "dailycode/resource" "errors" "fmt")
func DeleteDoc(ctx context.Context, index string, id string) error {
if est, err := index2.ExistIndex(ctx, index); err != nil {
return errors.New("查询索引失败")
} else {
if !est {
return errors.New("索引不存在")
} }
_, err := resource.EsClient.Delete().
Index(models.User{}.IndexName()).Id(id).Refresh("true").Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return errors.New("删除文档失败")
}
return nil
}
批量删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23func BatchDeleteDoc(ctx context.Context, index string, ids []string) (ret []string, err error) {
if est, err := index2.ExistIndex(ctx, index); err != nil {
return nil, errors.New("查询索引失败")
} else {
if !est {
return nil, errors.New("索引不存在")
} }
bulk := resource.EsClient.Bulk().Index(models.User{}.IndexName()).Refresh("true")
for _, s := range ids {
req := elastic.NewBulkDeleteRequest().Id(s)
bulk.Add(req)
}
res, err := bulk.Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return nil, errors.New("批量删除文档失败")
}
//b, _ := json.MarshalIndent(res, "", " ")
//fmt.Println(string(b)) for _, item := range res.Succeeded() {
ret = append(ret, item.Id)
}
return
}批量添加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15func BatchCreateDoc(ctx context.Context, users []models.User, index string) (ids []string, err error) {
bulk := resource.EsClient.Bulk().Index(index).Refresh("true")
for _, user := range users {
req := elastic.NewBulkIndexRequest().Doc(user)
bulk.Add(req)
} res, err := bulk.Do(ctx)
if err != nil {
return nil, err
}
for _, item := range res.Items {
ids = append(ids, item["index"].Id)
}
return
}
看了下bulk的结构,应该是一个类似操作器的东西,可以存入对应的操作,用Do一次性触发,返回的item如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41{
"took": 110,
"items": [
{
"delete": {
"_index": "user",
"_type": "_doc",
"_id": "gs8gP5sB-ucwcoQE7uV3",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 103,
"_primary_term": 1,
"status": 200,
"forced_refresh": true
}
},
{
"delete": {
"_index": "user",
"_type": "_doc",
"_id": "g88gP5sB-ucwcoQE7uV3",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 104,
"_primary_term": 1,
"status": 200,
"forced_refresh": true
}
}
]
}
文档更新
板子:
更新配置(可拓展其他配置项)1
2
3
4
5
6
7
8type UpdateOptions struct {
Index string
ID string
Doc map[string]interface{} // 要更新的字段
Upsert bool // 不存在是否插入
RetryOnConf int // 乐观锁冲突重试
Refresh string // true / wait_for / false
}
通用Update:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package esutil
import (
"context"
"fmt"
)
func UpdateDoc(ctx context.Context, opt UpdateOptions) error {
if opt.Index == "" || opt.ID == "" {
return fmt.Errorf("index or id is empty")
}
if len(opt.Doc) == 0 {
return nil
}
req := resource.ESClient.Update().
Index(opt.Index).
Id(opt.ID).
Doc(opt.Doc)
// 是否 upsert
if opt.Upsert {
req = req.DocAsUpsert(true)
}
// 版本冲突重试
if opt.RetryOnConf > 0 {
req = req.RetryOnConflict(opt.RetryOnConf)
}
// 刷新策略
if opt.Refresh != "" {
req = req.Refresh(opt.Refresh)
}
resp, err := req.Do(ctx)
if err != nil {
return fmt.Errorf("es update failed: %w", err)
}
if resp == nil || resp.Result == "" {
return fmt.Errorf("es update failed: empty response")
}
return nil
}
注意慎用refresh = true
使用例子:1
2
3
4
5
6
7
8err := esutil.UpdateDoc(ctx, resource.EsClient, esutil.UpdateOptions{
Index: "novel_chapter_v1",
ID: "123",
Doc: map[string]interface{}{
"title": "新的章节标题",
"update_time": time.Now(),
},
})
并发安全更新1
2
3
4
5
6
7
8err := esutil.UpdateDoc(ctx, resource.EsClient, esutil.UpdateOptions{
Index: "novel_chapter_v1",
ID: "123",
RetryOnConf: 3,
Doc: map[string]interface{}{
"view_count": 100,
},
})
upsert1
2
3
4
5
6
7
8
9
10
11err := esutil.UpdateDoc(ctx, resource.EsClient, esutil.UpdateOptions{
Index: "novel_chapter_v1",
ID: "123",
Upsert: true,
Doc: map[string]interface{}{
"book_id": 1,
"chapter_title": "第一章",
"content": "少年开始修炼……",
"create_at": time.Now(),
},
})
数据同步时候或者MQ Sync时候经常使用
文档查询
列表查询
1 | func Search(ctx context.Context, index string) (ret []models.User, err error) { |
精确查询
针对keyword类型。精确查询值为value的数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23func Search(ctx context.Context, keyword string, value string) (ret []models.User, err error) {
query := elastic.NewTermQuery(keyword, value)
res, err := resource.EsClient.Search().
Index(models.User{}.IndexName()).
Query(query).
From(0).
Size(10).
Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return nil, errors.New("查询文档失败")
}
for _, v := range res.Hits.Hits {
var user models.User
err := json.Unmarshal(v.Source, &user)
if err != nil {
fmt.Printf("%v", err)
continue
}
ret = append(ret, user)
}
return
}
调用search (ctx, “nick_name”, “你好”)
返回的是必须值为你好的条
模糊查询
主要针对于text类型,穿得是keyword时候会退化为term,不会模糊查询,还是得完全相等,数值类型也是,本质是把数据变成一个token去倒排索引里面找索引,实际是精确匹配
实现原理:以英文为例子,插入时候每个单词被单独分出来,记录每个单词出现的索引ID位置,之后查询时根据单词定位到对应文本实现模糊匹配,实现可能会类似于字典树? 但是节点存储的是索引而不是具体的字母。不同的分析器分词不一样,中文一般用ik_max_word / ik_smart, 查询时候会有权重分,如使用BM25,
score ≈
词在文档中出现的频率
× 词的稀有程度
÷ 文档长度
含义:
同时命中多个查询词 → 分高
词越少见 → 权重越高
文档越短 → 分越高
所以章节级索引比整本书更准
1 | func Search(ctx context.Context, text string, value string) (ret []models.User, err error) { |
运行例子1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func main() {
core.Es()
fmt.Printf("Global %+v\n", resource.EsClient)
var Infos = []models.User {
{
ID: 1,
Title: "keep running every day make you healthy",
NickName: "nick_name1",
CreateAt: time.Now(),
},
{
ID: 2,
Title: "I running on the plain",
NickName: "nick_name2",
CreateAt: time.Now(),
},
}
docs.BatchCreateDoc(context.Background(), Infos, models.User{}.IndexName())
x, err := docs.Search(context.Background(), "title", "running")
if err != nil {
fmt.Printf("%v", err)
}
fmt.Printf("============\n查询结果: %+v\n", x)
}
根据running模糊查询出了新加的这两条
嵌套查询
最常用的是text里面嵌套keyword,根据text里面的分词精确匹配,比如小说的章节里面带某某名字的,创建索引时候1
2
3
4
5
6
7
8
9
10
11{
"title": {
"type": "text",
"fields": {
"key_word": {
"type": "keyword",
"ignore_above: 256
}
},
}
}
这时候如果想要精确匹配某个text叫做哇啦啦的title,直接使用NewMatchSearch是搜不到的,因为分词可能并没有分到这个字段,如果里面嵌了关键词,ES的实现会根据分词精确查找,调用1
query := elastic.NewTermQuery(ctx, "title.key_word", "哇啦啦")
可以精确搜到
如果没有这个嵌入
调用1
query := elastic.NewMatchQuery(ctx, "title", "哇啦啦")
是搜不到的
