• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

分布式文件存储——使用RabbitMQ实现异步上传

武飞扬头像
~庞贝
帮助1

使用RabbitMQ实现异步上传

mq异步模式

学新通

学新通

rabbitmq介绍

学新通

学新通

学新通

学新通

学新通

安装rabbitmq

# 创建数据目录
mkdir /data/rabbitmq

# 启动mq
docker run -d --hostname rabbit-svr --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /data/rabbitmq:/var/lib/rabbitmq rabbitmq:management

打开管理后台

http://ip:15672

账号: guest
密码: guest

代码演示

package mq

import "github.com/jyyds/filestore/common"

// 转移队列中消息载体的结构体格式
type TransferData struct {
	FileHash      string
	CurLocation   string
	DestLocation  string
	DestStoreType common.StoreType
}

生产者

package mq

import (
	"log"

	"github.com/jyyds/filestore/config"
	"github.com/streadway/amqp"
)

var conn *amqp.Connection
var channel *amqp.Channel

func initChannel() bool {
	//  1.判断channel是否已经创建过
	if channel != nil {
		return true
	}
	//  2.获取rabbitmq的一个连接
	conn, err := amqp.Dial(config.RabbitURL)
	if err != nil {
		log.Println(err.Error())
		return false
	}

	//  3.打开一个channel,用于消息的发布与接收
	channel, err = conn.Channel()
	if err != nil {
		log.Println(err.Error())
		return false
	}
	return true

}

// 发布消息
func Puublish(exchange, routingKey string, msg []byte) bool {
	// 1.判断channel是否正常
	if !initChannel() {
		return false
	}

	// 2.执行消息发布动作
	err := channel.Publish(
		exchange,
		routingKey,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        msg,
		},
	)
	if err != nil {
		log.Println(err.Error())
		return false
	}

	return true
}

学新通
	data := mq.TransferData{
			FileHash:      fileMeta.FileSha1,
			CurLocation:   fileMeta.Location,
			DestLocation:  ossPath,
			DestStoreType: cmn.StoreOSS,
		}
		pubData, _ := json.Marshal(data)
		suc := mq.Puublish(
			cfg.TransExchangeName,
			cfg.TransOSSRoutingKey,
			pubData,
		)

消费者

package mq

import "log"

var done chan bool

// 开始监听队列,获取信息
func StartConsume(qName, cName string, callBack func(msg []byte) bool) {
	// 1. 用过channel.Consume获取消息信道
	msgs, err := channel.Consume(
		qName,
		cName,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Println(err.Error())
		return
	}

	// 2. 循环获取队列的消息

	go func() {
		for msg := range msgs {
			// 3.调用callback方法处理新的消息
			procssSuc := callBack(msg.Body)
			if !procssSuc {
				// TODO:将任务写到另外一个队列,用于异常情况的重试
			}
		}
	}()

	// done没有新的消息过来,会一直发生阻塞
	<-done

	// 关闭rabbitmq
	channel.Close()

}

学新通
package main

import (
	"bufio"
	"encoding/json"
	"log"
	"os"

	"github.com/jyyds/filestore/config"
	dblayer "github.com/jyyds/filestore/db"
	"github.com/jyyds/filestore/mq"
	"github.com/jyyds/filestore/store/oss"
)

func ProcessTransfer(msg []byte) bool {
	// 1. 解析msg
	pubData := mq.TransferData{}
	err := json.Unmarshal(msg, pubData)
	if err != nil {
		log.Println(err.Error())
		return false
	}

	// 2. 根据临时存储文件路径,创建文件句柄
	filed, err := os.Open(pubData.CurLocation)
	if err != nil {
		log.Println(err.Error())
		return false
	}

	// 3. 通过文件句柄将文件内容读出来并且上传到OSS

	err = oss.Bucket().PutObject(
		pubData.DestLocation,
		bufio.NewReader(filed),
	)
	if err != nil {
		log.Println(err.Error())
		return false
	}

	// 4. 更新文件的存储路径到文件表
	suc := dblayer.UpdateFileLocation(
		pubData.FileHash,
		pubData.DestLocation,
	)

	if !suc {
		log.Println(err.Error())
		return false
	}

	return true
}

func main() {
	log.Println("开始监听转移任务队列...")
	mq.StartConsume(
		config.TransOSSQueueName,
		"transfer_oss",
		ProcessTransfer,
	)
}

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgagkcj
系列文章
更多 icon
同类精品
更多 icon
继续加载