go-rabbitmq

amqp
go-amqp-reconnect

客户端样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/*
{
"instId": 123,
"url": "root:dahuacloud@172.100.81.13:5672",
"queueName": "test_crowdDensityStatistic",
}
*/
func amqpClient(instID int64, url string, queueName string) {
log.Info("amqpClient start with url=%s,queueName=%s", url, queueName)
//amqp://root:dahuacloud@172.100.81.13:5672/
conn, err := rabbitmq.Dial("amqp://" + url)
for err != nil {
conn, err = rabbitmq.Dial("amqp://" + url)
time.Sleep(2 * time.Second)
log.Warn("amqpClient retry dial url=%s,queueName=%s", url, queueName)
}
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)
go func() {
for d := range msgs {
log.Info("Received a message: %s", d.Body)
var mapMsg map[string]interface{}
json.Unmarshal(d.Body, &mapMsg)
mapMsg["instId"] = instID
empData, err := json.Marshal(mapMsg)
if err != nil {
fmt.Println(err.Error())
return
}
jsonStr := string(empData)

replay, err := rpcClientSend(jsonStr)
if err != nil {
log.Error("could not transmit: %v", err)
} else {
log.Info("server response: %s", replay)
}
}
}()

log.Info(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
坚持原创技术分享,您的支持将鼓励我继续创作!