分布式文件存储——使用RabbitMQ实现异步上传
使用RabbitMQ实现异步上传
mq异步模式
rabbitmq介绍
安装rabbitmq
# 创建数据目录
mkdir /data/rabbitmq
# 启动mq
docker run -d --hostname rabbit-svr --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /data/rabbitmq:/var/lib/rabbitmq rabbitmq:management
打开管理后台
http://ip:15672
账号: guest
密码: guest
代码演示
package mq
import "github.com/jyyds/filestore/common"
// 转移队列中消息载体的结构体格式
type TransferData struct {
FileHash string
CurLocation string
DestLocation string
DestStoreType common.StoreType
}
生产者
package mq
import (
"log"
"github.com/jyyds/filestore/config"
"github.com/streadway/amqp"
)
var conn *amqp.Connection
var channel *amqp.Channel
func initChannel() bool {
// 1.判断channel是否已经创建过
if channel != nil {
return true
}
// 2.获取rabbitmq的一个连接
conn, err := amqp.Dial(config.RabbitURL)
if err != nil {
log.Println(err.Error())
return false
}
// 3.打开一个channel,用于消息的发布与接收
channel, err = conn.Channel()
if err != nil {
log.Println(err.Error())
return false
}
return true
}
// 发布消息
func Puublish(exchange, routingKey string, msg []byte) bool {
// 1.判断channel是否正常
if !initChannel() {
return false
}
// 2.执行消息发布动作
err := channel.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: msg,
},
)
if err != nil {
log.Println(err.Error())
return false
}
return true
}
data := mq.TransferData{
FileHash: fileMeta.FileSha1,
CurLocation: fileMeta.Location,
DestLocation: ossPath,
DestStoreType: cmn.StoreOSS,
}
pubData, _ := json.Marshal(data)
suc := mq.Puublish(
cfg.TransExchangeName,
cfg.TransOSSRoutingKey,
pubData,
)
消费者
package mq
import "log"
var done chan bool
// 开始监听队列,获取信息
func StartConsume(qName, cName string, callBack func(msg []byte) bool) {
// 1. 用过channel.Consume获取消息信道
msgs, err := channel.Consume(
qName,
cName,
true,
false,
false,
false,
nil,
)
if err != nil {
log.Println(err.Error())
return
}
// 2. 循环获取队列的消息
go func() {
for msg := range msgs {
// 3.调用callback方法处理新的消息
procssSuc := callBack(msg.Body)
if !procssSuc {
// TODO:将任务写到另外一个队列,用于异常情况的重试
}
}
}()
// done没有新的消息过来,会一直发生阻塞
<-done
// 关闭rabbitmq
channel.Close()
}
package main
import (
"bufio"
"encoding/json"
"log"
"os"
"github.com/jyyds/filestore/config"
dblayer "github.com/jyyds/filestore/db"
"github.com/jyyds/filestore/mq"
"github.com/jyyds/filestore/store/oss"
)
func ProcessTransfer(msg []byte) bool {
// 1. 解析msg
pubData := mq.TransferData{}
err := json.Unmarshal(msg, pubData)
if err != nil {
log.Println(err.Error())
return false
}
// 2. 根据临时存储文件路径,创建文件句柄
filed, err := os.Open(pubData.CurLocation)
if err != nil {
log.Println(err.Error())
return false
}
// 3. 通过文件句柄将文件内容读出来并且上传到OSS
err = oss.Bucket().PutObject(
pubData.DestLocation,
bufio.NewReader(filed),
)
if err != nil {
log.Println(err.Error())
return false
}
// 4. 更新文件的存储路径到文件表
suc := dblayer.UpdateFileLocation(
pubData.FileHash,
pubData.DestLocation,
)
if !suc {
log.Println(err.Error())
return false
}
return true
}
func main() {
log.Println("开始监听转移任务队列...")
mq.StartConsume(
config.TransOSSQueueName,
"transfer_oss",
ProcessTransfer,
)
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgagkcj
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24