Featured image of post มาลองใช้ Memphis.dev เจ้าตัว message broker ที่ใช้งานโคตรง่าย และมี ui มาให้พร้อม

มาลองใช้ Memphis.dev เจ้าตัว message broker ที่ใช้งานโคตรง่าย และมี ui มาให้พร้อม

Memphis.dev is more than a broker. It's a new streaming stack.

Memphis.dev คืออะไร

Memphis.dev คือ opensource message broker ตัวใหม่ที่ถูกสร้างขึ้นให้ใช้งานง่าย รับปริมาณการใช้งานที่สูงได้ ความหน่วงของระบบน้อย ติดตั้งง่าย platform มีขนาดกระทัดรัด สามารถดูข้อมูลเชิงลึกได้

องค์ประกอบของ Memphis.dev

  • Memphis Broker: เป็นส่วนการกระจายข้อมูล ที่ทำหน้าที่เป็นชั้นเก็บข้อมูลหลักสำหรับเหตุการณ์หรือข้อมูลที่ถูกสร้างขึ้น
  • Memphis Schemaverse: เป็นเครื่องมือการจัดการและบังคับใช้ schema ที่สร้างขึ้นใน Memphis เพื่อช่วยผู้ใช้เพิ่มคุณภาพข้อมูลและป้องกันการไหลเข้าของข้อมูลที่ไม่ถูกต้อง (ไม่ตรงตาม format)
  • Memphis Functions: เป็นเครื่องมือสำหรับนักพัฒนาที่ถูกออกแบบมาเพื้อใช้ในการจัดการ data evenet ต่างๆ
  • Memphis Connect: ถูกออกแบบให้รองรับการใช้งานได้ในหลากหลายช่องทาง เช่น ผ่าน client SDK (port 6666) (ปัจจุบันรองรับอยู่ 6 ภาษา Node.JS, Go, Python, Typescript, NestJS, REST, .NET, Kotlin) , ผ่าน CLI (port 9000) , ผ่าน GUI (port 9000) และผ่าน REST (port 4000) high-level-architecture

จุดเด่นหลักของ Memphis.dev

  • Reliability - คิวและโบรกเกอร์เป็นองค์ประกอบที่สำคัญในโครงสร้างแอปพลิเคชันสมัยใหม่ และควรมีความพร้อมใช้งานสูงและมีเสถียรภาพมากที่สุดเท่าที่จะเป็นไปได้
  • Performance and Efficiency - ประสิทธิภาพที่ยอดเยี่ยมและใช้ทรัพยากรอย่างมีประสิทธิภาพ
  • Developer Experience - ถูกออกแบบให้ใช้งานพัฒนาระบบได้อย่างง่าย และลดเวลาในการเรียนรู้และพัฒนาระบบลง
  • Observability - เพิ่มความสามารถในการสังเกต, สามารถใช้งานร่วมกับ 3rd-party monitoring tools ได้, การแจ้งเตือนแบบเรียลไทม์, ช่วยลดเวลาในการค้นหาต้นตอและแก้ไขปัญหา

✨ Key Features v1.1.1

  • Production-ready message broker in under 3 minutes
  • Easy-to-use UI, CLI, and SDKs
  • Data-level observability
  • Dead-Letter Queue with automatic message retransmit
  • Schemaverse - Embedded schema management for produced data (Protobuf/JSON/GraphQL/Avro)
  • Storage tiering
  • SDKs: Node.JS, Go, Python, Typescript, NestJS, REST, .NET, Kotlin
  • Kubernetes-native
  • Community driven

Common use cases

  • Async task management
  • Real-time streaming pipelines
  • Data ingestion
  • Async communication between services on k8s
  • Queuing
  • Multiple destinations to a single message
  • Ingest Grafana loki logs
  • Stream video frames

การติดตั้ง memphis.dev

ในการติดตั้ง memphis.dev นั้น สามารถติดตั้งได้ง่าย โดยเราสามารถเลือกได้เลยว่าเราจะติดตั้งอละใช้งานบน docker หรือ kubernetese

1
2
#Docker compose (Syntax for v2)
curl -s https://memphisdev.github.io/memphis-docker/docker-compose.yml -o docker-compose.yml && docker compose -f docker-compose.yml -p memphis up
1
2
helm repo add memphis https://k8s.memphis.dev/charts/ --force-update && 
helm install memphis memphis/memphis --create-namespace --namespace memphis --wait

สามารถเข้าไปใช้งานหน้าเว็บได้ผ่านทาง http://localhost:9000/ จากนั้นทำการตั้งค่า user สำหรับเข้าใช้งานระบบ

หากเราไม่อยากติดตั้ง ทาง memphis team มีการสร้าง cloud สำหรับให้ใช้งานด้วย โดยมี free tier ให้ใช้ที่ 1M message/month สมัครใช้งาน Memphis Cloud ได้ที่ https://cloud.memphis.dev/

หน้าการใช้งาน memphis.dev

ในการเข้าใช้งานครั้งแรก memphis จะมี tutorial guideline ให้เราเรียนรู้ เราสามารถทำตามขั้นตอนได้เลย

Create station

station เปรียบเสมือนห้องที่ใช้ในการรับส่งข้อมูล โดยใน memphis จะมีให้เราตั้งค่าดังนี้

  • station name ชื่อของ station
  • replica ส่วนของการสำรองข้อมูลให้มีการเก็บรวบรวมไว้กี่ที่ (HA mode)
  • de-duplicate ส่วนของการป้องการการซ้ำของข้อมูลที่เข้ามาภายในช่วงเวลาที่กำหนด
  • dead letter station เอาไว้สำหรับการ debug ข้อมูลที่มีปัญหาได้ง่ายขึ้น
  • retention policy เราสามารถทำการเก็บข้อมูลได้โดย ถ้าเก็บใน local storage จะมีให้เลือกว่าต้องการเก็บไว้ใน disk หรือใน memory และสามารถเลือกได้ว่าระยะเวลาในการจัดเก็บ message จะเก็บเป็นแบบกี่วัน หรือจำนวนกี่ message หรือเก็บตามขนาด size ได้ นอกจากนี้ทาง memphis ยังรองรับการเก็บผ่าน remote storage ได้อีกด้วย ในปัจจุบัน มีแค่ S3 storage

1-gl-create-station

Create Application User

ใน memphis จะมีชนิดของ user แยกออกจากกัน ระหว่าง User สำหรับการจัดการระบบและ application user (user สำหรับให้เราเอาไปใช้ใน code การเชื่อมต่อต่างๆ) ให้เราทำการสร้าง user ให้เรียบร้อย 2-gl-create-application-user

Connector Guideline

guideline ส่วนต่อมาจะเป็นตัวอย่าง code ทั้ง producer (ตัวส่ง message เข้า broker) และ consumer (ตัวรับ message จาก broker ไปใช้งาน) โดยมีรองรับหลายภาษา สามารถเลือกใช้งานตามภาษาที่ต้องการได้เลย 3-gl-create-config-produce 4-gl-create-config-consume

ลองใช้งานใน Golang SDK

Step 1: Create an empty dir for the Go project

1
2
mkdir memphis-demo && \
cd memphis-demo

Step 2: Init the newly created project

1
go mod init memphis-demo

Step 3: In your project’s directory, install Memphis Go SDK

1
go get github.com/memphisdev/memphis.go

Step 4: Create a new Go file called producer.go

ทั้ง producer.go และ consumer.go อย่าลืมตั้ง host , application user, password , station name, producer name และ consumer name ให้ตรงตามต้องการด้วย

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
	"fmt"
	"os"

	"github.com/memphisdev/memphis.go"
)

func main() {
	conn, err := memphis.Connect("localhost", "app-demo01", memphis.Password("eSdwe!320")) //host , application-user , password
	if err != nil {
		os.Exit(1)
	}
	defer conn.Close()
	p, err := conn.CreateProducer("test", "producer01") // station name and producer name

	hdrs := memphis.Headers{}
	hdrs.New()
	err = hdrs.Add("key", "value")

	if err != nil {
		fmt.Errorf("Header failed: %v", err)
		os.Exit(1)
	}

	err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))

	if err != nil {
		fmt.Errorf("Produce failed: %v", err)
		os.Exit(1)
	}
}

Step 5: Run producer.go

1
go run producer.go

Step 6: Create a new Go file called consumer.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/memphisdev/memphis.go"
)

func main() {
	conn, err := memphis.Connect("localhost", "app-demo01", memphis.Password("eSdwe!320")) //host , application-user , password
	if err != nil {
		os.Exit(1)
	}
	defer conn.Close()

	consumer, err := conn.CreateConsumer("test", "consumer01", memphis.PullInterval(15*time.Second)) // station name and consumer name

	if err != nil {
		fmt.Printf("Consumer creation failed: %v", err)
		os.Exit(1)
	}

	handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
		if err != nil {
			fmt.Printf("Fetch failed: %v", err)
			return
		}

		for _, msg := range msgs {
			fmt.Println(string(msg.Data()))
			msg.Ack()
			headers := msg.GetHeaders()
			fmt.Println(headers)
		}
	}

	ctx := context.Background()
	ctx = context.WithValue(ctx, "key", "value")
	consumer.SetContext(ctx)
	consumer.Consume(handler)

	// The program will close the connection after 30 seconds,
	// the message handler may be called after the connection closed
	// so the handler may receive a timeout error
	time.Sleep(30 * time.Second)
}

Step 7: Run consumer.go

1
go run consumer.go

เพียงเท่านี้ เราก็สามารถทำการส่งและรับข้อมูลผ่าน message broker ตัว memphis ได้อย่างง่ายดาย 5.1-output 5-station

ข้อควรรู้เพิ่มเติม หลังจากได้ลองใช้งานมาสักระยะ

  • station หากไม่สร้างก่อนระบบจะทำการสร้างให้ อัตโนมัติ
  • ในส่วนของ consumer เราสามารถสร้าง consumer group สำหรับรับข้อมูล แยกออกจากกันได้ โดยข้อมูลจะเข้า worker ตามจำนวนข้อมูลเก่าที่มีอยู่ใน storage
  • ควรระวัง consumer name และ producer name ซ้ำกันไม่ได้
  • เราสามารถใช้ schemaverse ทำการตั้งค่า input format เพื่อให้ข้อมูลใน station นั้นๆ มีเฉพาะข้อมูลที่ถูกกับ format ที่เราต้องการได้ ทำให้ message ที่เข้ามาใน station มีประสิทธิภาพมากขึ้น ปัจจุบันรองรับ protobuff , JSON schema , GraphQL schema
  • เราสามารถแปะค่าหรือข้อมูลบางอย่างไปกับ message ได้ด้วยโดยการใช้ header (producer ทำการส่ง header แปะไปกับ message ฝั่ง consumer ก็ทำการรับ message และแกะ header นั้นๆ ไปใช้) 7-schema-verse
อย่าลืมให้กำลังใจสล็อตด้วยการกดไลค์ กดแชร์บทความ กดติดตามเพจ Sloth Coding ด้วยนะฮะ ขอบคุณฮะ ;)
Licensed under CC BY-NC-SA 4.0