es安装

使用docker安装es

拉取镜像

1
docker pull elasticsearch:7.12.0

创建docker容器挂在的目录: (windows手动创建)

1
2
3
4
# linux的命令
mkdir -p /opt/es/config & mkdir -p /opt/es/data & mkdir -p /opt/es/plugins

chmod 777 /opt/es/data

配置文件

1
echo "http.host: 0.0.0.0" > /opt/es/config/elasticsearch.yml

创建容器

1
2
3
4
5
6
# linux
docker run --name es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms84m -Xmx512m" -v /opt/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /opt/es/data:/usr/share/elasticsearch/data -v /opt/es/plugins:/usr/share/elasticsearch/plugins -d elasticsearch:7.12.0

# windows
docker run --name es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms84m -Xmx512m" -v H:\\docker\\es\\config\\elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v H:\\docker\\es\\data:/usr/share/elasticsearch/data -v H:\\docker\\es\\plugins:/usr/share/elasticsearch/plugins -d elasticsearch:7.12.0
# windows添加目录映射,需要在dockerDesktop里面设置映射目录

访问ip:9200能看到东西


图形化界面

浏览器下载 Multi Elasticsearch Head es插件


Go里面链接ES

这里用ES7演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package core  

import (
"fmt"
"github.com/olivere/elastic/v7")

func Es() {
client, err := elastic.NewClient(
elastic.SetURL("http://127.0.0.1:9200"),
elastic.SetSniff(false),
elastic.SetBasicAuth("", ""),
) if err != nil {
fmt.Printf("Error: %v", err)
return
}
fmt.Printf("Connected to Elasticsearch %+v", client)
}

关于SetSniff

  • Sniff = 客户端启动后,主动去 ES 集群问一圈: “你们集群里到底有哪些节点?我以后该连谁?”

ES 集群真实情况

一个 ES 集群通常是这样的:

1
2
3
4
5
[Client]    
|
| → 192.168.1.10:9200(master)
| → 192.168.1.11:9200data
| → 192.168.1.12:9200data)`

但你在代码里只配置了一个地址

http://localhost:9200

那客户端怎么知道 还有别的节点

👉 这就靠 Sniff
当在代码链接里写:

1
client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"), elastic.SetSniff(true))

启动后会发生:

1️. 客户端请求:

GET /_nodes/http

2️. ES 返回集群中所有节点的地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{   
"nodes": {
"node1": {
"http": {
"publish_address": "10.0.0.1:9200"
}
},
"node2": {
"http": {
"publish_address": "10.0.0.2:9200"
}
}
}
}

3️. 客户端:

  • 自动把所有节点加入连接池

  • 后续请求 负载均衡

  • 节点挂了会自动剔除

    这是一个“客户端级别的服务发现 + 负载均衡”

必须关闭

在 Docker、K8s、云 ES、反向代理环境下必须关闭,否则客户端会尝试连接不可达的内部地址。
例如部署在docker的127.0.0.1:9200, 使用sniff后会拿到http:127.0.0.1,这时候宿主机和其他服务访问不到,会报错
![[{19E41DF3-51CE-499C-B2DE-FA799E611ECD}.png]]


ES设置密码

不需要的密码情况:

  1. 服务器自己使用,云服务器使用ES,但是9200, 9300端口没有对外开放
    对外提供服务:
  2. es提供对外服务时候

ES索引操作

type 选择速查表

场景 type 用法时机 实例
全文搜索 text
精确匹配 keyword
搜索 + 排序 text + keyword
主键 ID long / keyword
金额 scaled_float
时间 date 不要写错 ,不要写错 format,不要写错 format,不要写错 format,不要写错 format, 不要。。。不要写错 format
`”create_at”: { “type”: “date”, “format”: “yyyy-MM-dd HH:mm:ss\
\ strict_date_optional_time\ \ epoch_millis” }`
枚举 keyword
布尔 boolean
对象数组 nested "chapters": [ { "id": 1, "title": "第一章" }, { "id": 2, "title": "第二章" } ] 查询慢

- 占用更多内存
普通对象 object
自动补全(搜索联想)
completion - 自动补全

- 输入提示
语义搜索 dense_vector
ip 特殊类型

能不用 nested 就不用
能拆 document 就不嵌套
全文 = text,过滤 = keyword
ID 永远不要用 text
中文必须考虑 analyzer

创建索引

实操:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package models  

import "time"

type User struct {
ID uint `json:"id"`
Title string `json:"title"`
NickName string `json:"nick_name"`
CreateAt time.Time `json:"create_at"`
}

func (User) Mapping() string {
return `
{
"mappings": {
"properties": {
"title": {
"type": "text"
},
"nick_name": {
"type": "keyword"
},
"id": {
"type": "integer"
},
"created_at": {
"type": "date",
"null_value": "null",
"format": "[yyyy-MM-dd HH:mm:ss]"
}
}
}
`
}

create:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package index  

import (
"context"
"dailycode/models" "dailycode/resource" "errors" "fmt")

func CreateIndex(ctx context.Context, index string) error {
ck, err := Exist_index(ctx, index)
if err != nil {
fmt.Printf("%v", err)
return errors.New("查询索引失败")
}
if ck {
fmt.Println("索引已存在")
return nil
}

idx, err := resource.EsClient.
CreateIndex(index).BodyString(models.User{}.Mapping()).Do(ctx)
if err != nil || !idx.Acknowledged {
fmt.Printf("%v", err)
return errors.New("创建索引失败")
}

return nil
}

func Exist_index(ctx context.Context, index string) (exist bool, err error) {
return resource.EsClient.IndexExists(index).Do(ctx)
}

查询索引:浏览器访问:
http://127.0.0.1:9200/user/_mapping
本质请求了查询接口

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package index  

import (
"context"
"dailycode/resource" "errors")

func DeleteIndex(ctx context.Context, index string) error {
ok, err := Exist_index(ctx, index)
if err != nil {
return errors.New("查询索引失败")
}
if ok {
x, err := resource.EsClient.DeleteIndex(index).Do(ctx)
if err != nil {
return err
}
if x == nil {
return errors.New("删除索引失败, empty response")
}
if !x.Acknowledged {
return errors.New("删除索引失败")
}
}
return nil
}

更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package index  

import (
"context"
)

func UpdateIndex(ctx context.Context, index string) error {
err := DeleteIndex(ctx, index)
if err != nil {
return err
}
err = CreateIndex(ctx, index)
if err != nil {
return err
}
return nil
}

增删改查文档

  • 增加:
    1
    2
    3
    4
    5
    6
    7
    8
    func CreateDoc(ctx context.Context, user models.User, index string) (indexId string, err error) {  
    resp, err := resource.EsClient.Index().Index(index).BodyJson(user).Do(ctx)
    if err != nil {
    return "", err
    }
    fmt.Printf("%#v", resp)
    return resp.Id, nil
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
elastic.IndexResponse
{
Index:"user",
Type:"_doc",
Id:"TM_2PpsB-ucwcoQEQuVP",
Version:1,
Result:"created",
Shards:(*elastic.ShardsInfo)(0xc000030780),
SeqNo:0,
PrimaryTerm:1,
Status:0,
ForcedRefresh:false
}

如果是mapping里面没有,es会自己创建新的字段,应该是es自己实现上的问题

  • 删除

根据ID删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package docs  

import (
"context"
index2 "dailycode/index"
"dailycode/models" "dailycode/resource" "errors" "fmt")

func DeleteDoc(ctx context.Context, index string, id string) error {
if est, err := index2.ExistIndex(ctx, index); err != nil {
return errors.New("查询索引失败")
} else {
if !est {
return errors.New("索引不存在")
} }
_, err := resource.EsClient.Delete().
Index(models.User{}.IndexName()).Id(id).Refresh("true").Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return errors.New("删除文档失败")
}
return nil
}

  • 批量删除

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    func BatchDeleteDoc(ctx context.Context, index string, ids []string) (ret []string, err error) {  
    if est, err := index2.ExistIndex(ctx, index); err != nil {
    return nil, errors.New("查询索引失败")
    } else {
    if !est {
    return nil, errors.New("索引不存在")
    } }
    bulk := resource.EsClient.Bulk().Index(models.User{}.IndexName()).Refresh("true")
    for _, s := range ids {
    req := elastic.NewBulkDeleteRequest().Id(s)
    bulk.Add(req)
    }
    res, err := bulk.Do(ctx)
    if err != nil {
    fmt.Printf("%v", err)
    return nil, errors.New("批量删除文档失败")
    }
    //b, _ := json.MarshalIndent(res, "", " ")
    //fmt.Println(string(b)) for _, item := range res.Succeeded() {
    ret = append(ret, item.Id)
    }
    return
    }
  • 批量添加

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func BatchCreateDoc(ctx context.Context, users []models.User, index string) (ids []string, err error) {  
    bulk := resource.EsClient.Bulk().Index(index).Refresh("true")
    for _, user := range users {
    req := elastic.NewBulkIndexRequest().Doc(user)
    bulk.Add(req)
    } res, err := bulk.Do(ctx)
    if err != nil {
    return nil, err
    }

    for _, item := range res.Items {
    ids = append(ids, item["index"].Id)
    }
    return
    }

看了下bulk的结构,应该是一个类似操作器的东西,可以存入对应的操作,用Do一次性触发,返回的item如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
"took": 110,
"items": [
{
"delete": {
"_index": "user",
"_type": "_doc",
"_id": "gs8gP5sB-ucwcoQE7uV3",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 103,
"_primary_term": 1,
"status": 200,
"forced_refresh": true
}
},
{
"delete": {
"_index": "user",
"_type": "_doc",
"_id": "g88gP5sB-ucwcoQE7uV3",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 104,
"_primary_term": 1,
"status": 200,
"forced_refresh": true
}
}
]
}

文档更新

板子:

更新配置(可拓展其他配置项)

1
2
3
4
5
6
7
8
type UpdateOptions struct {
Index string
ID string
Doc map[string]interface{} // 要更新的字段
Upsert bool // 不存在是否插入
RetryOnConf int // 乐观锁冲突重试
Refresh string // true / wait_for / false
}

通用Update:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package esutil

import (
"context"
"fmt"
)

func UpdateDoc(ctx context.Context, opt UpdateOptions) error {
if opt.Index == "" || opt.ID == "" {
return fmt.Errorf("index or id is empty")
}
if len(opt.Doc) == 0 {
return nil
}

req := resource.ESClient.Update().
Index(opt.Index).
Id(opt.ID).
Doc(opt.Doc)

// 是否 upsert
if opt.Upsert {
req = req.DocAsUpsert(true)
}

// 版本冲突重试
if opt.RetryOnConf > 0 {
req = req.RetryOnConflict(opt.RetryOnConf)
}

// 刷新策略
if opt.Refresh != "" {
req = req.Refresh(opt.Refresh)
}

resp, err := req.Do(ctx)
if err != nil {
return fmt.Errorf("es update failed: %w", err)
}

if resp == nil || resp.Result == "" {
return fmt.Errorf("es update failed: empty response")
}

return nil
}

注意慎用refresh = true

使用例子:

1
2
3
4
5
6
7
8
err := esutil.UpdateDoc(ctx, resource.EsClient, esutil.UpdateOptions{
Index: "novel_chapter_v1",
ID: "123",
Doc: map[string]interface{}{
"title": "新的章节标题",
"update_time": time.Now(),
},
})

并发安全更新
1
2
3
4
5
6
7
8
err := esutil.UpdateDoc(ctx, resource.EsClient, esutil.UpdateOptions{
Index: "novel_chapter_v1",
ID: "123",
RetryOnConf: 3,
Doc: map[string]interface{}{
"view_count": 100,
},
})

upsert
1
2
3
4
5
6
7
8
9
10
11
err := esutil.UpdateDoc(ctx, resource.EsClient, esutil.UpdateOptions{
Index: "novel_chapter_v1",
ID: "123",
Upsert: true,
Doc: map[string]interface{}{
"book_id": 1,
"chapter_title": "第一章",
"content": "少年开始修炼……",
"create_at": time.Now(),
},
})

数据同步时候或者MQ Sync时候经常使用


文档查询

列表查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Search(ctx context.Context, index string) (ret []models.User, err error) {  
query := elastic.NewBoolQuery()
res, err := resource.EsClient.Search().
Index(models.User{}.IndexName()).
Query(query).
From(0).
Size(10).
Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return nil, errors.New("查询文档失败")
}
for _, v := range res.Hits.Hits {
var user models.User
err := json.Unmarshal(v.Source, &user)
if err != nil {
fmt.Printf("%v", err)
continue
}
ret = append(ret, user)
}
return
}

精确查询

针对keyword类型。精确查询值为value的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Search(ctx context.Context, keyword string, value string) (ret []models.User, err error) {  
query := elastic.NewTermQuery(keyword, value)
res, err := resource.EsClient.Search().
Index(models.User{}.IndexName()).
Query(query).
From(0).
Size(10).
Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return nil, errors.New("查询文档失败")
}
for _, v := range res.Hits.Hits {
var user models.User
err := json.Unmarshal(v.Source, &user)
if err != nil {
fmt.Printf("%v", err)
continue
}
ret = append(ret, user)
}
return
}

调用search (ctx, “nick_name”, “你好”)
返回的是必须值为你好的条

模糊查询

主要针对于text类型,穿得是keyword时候会退化为term,不会模糊查询,还是得完全相等,数值类型也是,本质是把数据变成一个token去倒排索引里面找索引,实际是精确匹配

实现原理:以英文为例子,插入时候每个单词被单独分出来,记录每个单词出现的索引ID位置,之后查询时根据单词定位到对应文本实现模糊匹配,实现可能会类似于字典树? 但是节点存储的是索引而不是具体的字母。不同的分析器分词不一样,中文一般用ik_max_word / ik_smart, 查询时候会有权重分,如使用BM25,
score ≈
词在文档中出现的频率
× 词的稀有程度
÷ 文档长度
含义:

同时命中多个查询词 → 分高

词越少见 → 权重越高

文档越短 → 分越高

所以章节级索引比整本书更准

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Search(ctx context.Context, text string, value string) (ret []models.User, err error) {  
query := elastic.NewMatchQuery(text, value)
res, err := resource.EsClient.Search().
Index(models.User{}.IndexName()).
Query(query).
From(0).
Size(10).
Do(ctx)
if err != nil {
fmt.Printf("%v", err)
return nil, errors.New("查询文档失败")
}
for _, v := range res.Hits.Hits {
var user models.User
err := json.Unmarshal(v.Source, &user)
if err != nil {
fmt.Printf("%v", err)
continue
}
ret = append(ret, user)
}
return
}

运行例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {  
core.Es()
fmt.Printf("Global %+v\n", resource.EsClient)

var Infos = []models.User {
{
ID: 1,
Title: "keep running every day make you healthy",
NickName: "nick_name1",
CreateAt: time.Now(),
},
{
ID: 2,
Title: "I running on the plain",
NickName: "nick_name2",
CreateAt: time.Now(),
},
}
docs.BatchCreateDoc(context.Background(), Infos, models.User{}.IndexName())

x, err := docs.Search(context.Background(), "title", "running")
if err != nil {
fmt.Printf("%v", err)
}
fmt.Printf("============\n查询结果: %+v\n", x)

}

根据running模糊查询出了新加的这两条

嵌套查询

最常用的是text里面嵌套keyword,根据text里面的分词精确匹配,比如小说的章节里面带某某名字的,创建索引时候

1
2
3
4
5
6
7
8
9
10
11
{
"title": {
"type": "text",
"fields": {
"key_word": {
"type": "keyword",
"ignore_above: 256
}
},
}
}

这时候如果想要精确匹配某个text叫做哇啦啦的title,直接使用NewMatchSearch是搜不到的,因为分词可能并没有分到这个字段,如果里面嵌了关键词,ES的实现会根据分词精确查找,调用

1
query := elastic.NewTermQuery(ctx, "title.key_word", "哇啦啦")

可以精确搜到
如果没有这个嵌入
调用
1
query := elastic.NewMatchQuery(ctx, "title", "哇啦啦")

是搜不到的