使用阿里云rocketmq实现简单发送和消费

简单实现一个小demo

使用阿里云rocketmq实现简单发送和消费

最近想做一个预约的功能,没看到很合适的插件,就准备自己做一个简化版的。 某天突然想到可以用延时消息去做,于是简单实现了一个版本,使用的资源是阿 里云的rocketmq 5.0,代码用的golang。

简单思想就和工单的消费和处理一样,只不过简化了下。

生产端根据用户的时间确定delay的具体时间数,我这边精确度不高,所以一分 钟轮询一次,也可以做成订阅的方式,我后续有空再研究研究。

消费端只需要负责消费就好了。

生产端

具体代码 参考了官网给的示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/* Copied from https://github.com/apache/rocketmq-clients/blob/rocketmq-client-golang-5.0.0/golang/example/producer/delay/main.go by lqx at 2023-11-28T10:33:19+08:00
 */

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

func main() {
	topic := flag.String("topic", "", "rocketmq 5.x topic")
	groupName := flag.String("group_name", "", "rocketmq 5.x group")
	endpoint := flag.String("endpoint", "", "rocketmq 5.x endpoint")
	// in rocketmq 5.x accessKey can be anything, doesn't need to be acure
	accessKey := flag.String("access_key", "", "rocketmq 5.x access key")
	// in rocketmq 5.x secretKey can be anything, doesn't need to be acure
	secretKey := flag.String("secret_key", "", "rocketmq 5.x secret key")
	delaySeconds := flag.Int("delay_seconds", 0, "rocketmq 5.x delay seconds")
	message := flag.String("message", "", "rocketmq 5.x message")
	tag := flag.String("tag", "", "rocketmq 5.x message tag")
	keys := flag.String("keys", "", "rocket 5.x message keys split with comma")

	flag.Parse()
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "false")
	golang.ResetLogger()
	// new producer instance
	producer, err := golang.NewProducer(&golang.Config{
		Endpoint:      *endpoint,
		ConsumerGroup: *groupName,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    *accessKey,
			AccessSecret: *secretKey,
		},
	},
		golang.WithTopics(*topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// gracefule stop producer
	defer producer.GracefulStop()

	// new a message json like
	msg := &golang.Message{
		Topic: *topic,
		Body:  []byte(*message),
	}
	msg.SetTag(*tag)
	msg.SetKeys(*keys)

	// set delay timestamp
	msg.SetDelayTimestamp(time.Now().Add(time.Second * time.Duration(*delaySeconds)))
	// send message in sync
	resp, err := producer.Send(context.TODO(), msg)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%#v\n", resp)
	// wait some seconds
	time.Sleep(time.Second * 3)
}

使用命令 实际使用ak和sk可以瞎填 vpc内无认证

1
go run send_delay_message.go --topic events --group_name events-env-group --endpoint rmq-xxx.cn-hangzhou.rmq.aliyuncs.com:8080 --access_key ak1 --secret_key sk1 --delay_seconds 10 --message '{"m_type":"simple","url": "http://xxx/job/env_close_and_open_aliyun/buildWithParameters?action=stop"}' --tag env_close_open --keys key1,key2,key3

这样就发送了一个消息到对应的topic, 另一端在一定时间进行处理就好了

消费端

轮询

参考官网example

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
   Copied from https://github.com/apache/rocketmq-clients/blob/rocketmq-client-golang-5.0.0/golang/example/consumer/simple_consumer/main.go by lqx at 2023-11-27T15:34:51+08:00.
*/

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"net/http"
	"os"
	"regexp"
	"time"

	"github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	topic := flag.String("topic", "", "rocketmq 5.x topic")
	groupName := flag.String("group_name", "", "rocketmq 5.x group")
	endpoint := flag.String("endpoint", "", "rocketmq 5.x endpoint")
	accessKey := flag.String("access_key", "", "rocketmq 5.x access key")
	secretKey := flag.String("secret_key", "", "rocketmq 5.x secret key")

	flag.Parse()
	// log to console
	// os.Setenv("mq.consoleAppender.enabled", "true")
	os.Setenv("rocketmq.client.logRoot", "./logs")
	os.Setenv("rocketmq.client.logLevel", "warning")
	golang.ResetLogger()
	// new simpleConsumer instance
	simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
		Endpoint:      *endpoint,
		ConsumerGroup: *groupName,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    *accessKey,
			AccessSecret: *secretKey,
		},
	},
		golang.WithAwaitDuration(awaitDuration),
		golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
			*topic: golang.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}

	// gracefule stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		for {
			fmt.Println("start recevie message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				tag := mv.GetTag()
				keys := mv.GetKeys()
				// ensure it's json like
				body := mv.GetBody()
				var data map[string]string
				err := json.Unmarshal(body, &data)
				if err != nil {
					fmt.Println(err)
					break
				}
				fmt.Println("got message tag ", *tag)
				fmt.Println("got message keys ", keys)
				fmt.Println("got message data ", data)
				// only close_open like tag will ack
				if matched, _ := regexp.Match("close_open", []byte(*tag)); matched == true {
					callback, ok := data["url"]
					if ok {
						fmt.Println("will request with url", data["url"])
						ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
						defer cancel()

						jenkins_username := os.Getenv("JENKINS_USERNAME")
						jenkins_password := os.Getenv("JENKINS_PASSWORD")
						// will send jenkins stop env requests
						req, err := http.NewRequestWithContext(ctx, http.MethodPost, callback, nil)
						req.SetBasicAuth(jenkins_username, jenkins_password)
						// make the POST request
						client := &http.Client{}
						res, err := client.Do(req)
						if err != nil {
							fmt.Println("Send jenkins request failed:", err)
							simpleConsumer.Ack(context.TODO(), mv)
							break
						} else {
							// Check the response status code
							if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusCreated {
								fmt.Println("Send jenkins request successfully")
							} else {
								fmt.Println("Send jenkins request failed status:", res.Status)
							}
						}
						defer res.Body.Close()

					} else {
						fmt.Println("does not have url, ignore")
					}
				} else {
					fmt.Println("ignore this message, but ack")
				}
				simpleConsumer.Ack(context.TODO(), mv)

			}
			time.Sleep(time.Second * 10)
		}
	}()
	// Keep the application running
	done := make(chan struct{})
	<-done
}

我这个做的事情就是根据用户的url,进行对应的jenkins job触发,10s轮询一 次

这次使用的是simpleConsumer

订阅方式

订阅的时候,时间会更准确,这次是参考网上示例,用来pushConsumer

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
   Copied from https://github.com/apache/rocketmq-clients/blob/rocketmq-client-golang-5.0.0/golang/example/consumer/simple_consumer/main.go by lqx at 2023-11-27T15:34:51+08:00.
*/

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"net/http"
	"os"
	"regexp"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	topic := flag.String("topic", "", "rocketmq 5.x topic")
	groupName := flag.String("group_name", "", "rocketmq 5.x group")
	endpoint := flag.String("endpoint", "", "rocketmq 5.x endpoint")

	flag.Parse()
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "false")
	os.Setenv("rocketmq.client.logRoot", "./logs")
	os.Setenv("rocketmq.client.logLevel", "warning")
	// new simpleConsumer instance
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName(*groupName),           // Specify the consumer group name
		consumer.WithNameServer([]string{*endpoint}), // Specify the name server address
	)
	if err != nil {
		fmt.Println("create consumer error ", err)
	}

	fmt.Println(" before subscribe ")
	err = c.Subscribe(string(*topic), consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		// Custom callback function to process the received messages
		fmt.Println("test")
		for _, msg := range msgs {
			fmt.Println("msg ", msg)
			tag := msg.GetTags()
			keys := msg.GetKeys()
			// ensure it's json like
			body := msg.Body
			var data map[string]string
			err := json.Unmarshal(body, &data)
			if err != nil {
				fmt.Println(err)
				break
			}
			fmt.Println("got message tag ", tag)
			fmt.Println("got message keys ", keys)
			fmt.Println("got message data ", data)
			// only close_open like tag will ack
			if matched, _ := regexp.Match("close_open", []byte(tag)); matched == true {
				callback, ok := data["url"]
				if ok {
					fmt.Println("will request with url", data["url"])
					ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
					defer cancel()

					jenkins_username := os.Getenv("JENKINS_USERNAME")
					jenkins_password := os.Getenv("JENKINS_PASSWORD")
					// will send jenkins stop env requests
					req, err := http.NewRequestWithContext(ctx, http.MethodPost, callback, nil)
					req.SetBasicAuth(jenkins_username, jenkins_password)
					// make the POST request
					client := &http.Client{}
					res, err := client.Do(req)
					if err != nil {
						fmt.Println("Send jenkins request failed:", err)
						break
					} else {
						// Check the response status code
						if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusCreated {
							fmt.Println("Send jenkins request successfully")
						} else {
							fmt.Println("Send jenkins request failed status:", res.Status)
						}
					}
					defer res.Body.Close()

				} else {
					fmt.Println("does not have url, ignore")
				}
			} else {
				fmt.Println("ignore this message, but ack")
			}
		}
		return consumer.ConsumeSuccess, nil
	})

	if err != nil {
		log.Fatalf("Failed to subscribe: %v", err)
	}

	err = c.Start()
	if err != nil {
		log.Fatalf("Failed to start consumer: %v", err)
	}

	// Keep the application running
	done := make(chan struct{})
	<-done
	defer c.Shutdown()
}

这里有个问题是要注意填endpoint的时候,我必须要用ip才行,估计需要额外指 定dns

订阅方式的区别

根据官网的定义的消费方式的区别 https://rocketmq.apache.org/zh/docs/featureBehavior/06consumertype/

目前我用的区别在于simpleConsumer要自己ack,否则消息会再次被处理

整体使用是simpleConsumer的定制化更强