• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

golang使用泛型实现mapreduce操作

武飞扬头像
我终于有blog了
帮助1

1.使用面向对象的方式写

  1.  
    package stream
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "log"
  6.  
    "reflect"
  7.  
    "sort"
  8.  
    "strconv"
  9.  
    "strings"
  10.  
    )
  11.  
     
  12.  
    type Stream[T any] struct {
  13.  
    data []T
  14.  
    keyBy string
  15.  
    sortByNum string
  16.  
    sortByStr []string
  17.  
    }
  18.  
     
  19.  
    func FromElement[T any](data []T) *Stream[T] {
  20.  
    return &Stream[T]{
  21.  
    data: data,
  22.  
    }
  23.  
    }
  24.  
     
  25.  
    // 过滤算子
  26.  
    type filterfunc[F any] func(F) bool
  27.  
     
  28.  
    func (s *Stream[T]) Filter(filterFun filterfunc[T]) *Stream[T] {
  29.  
    var new []T
  30.  
    for _, item := range s.data {
  31.  
    isfiltered := filterFun(item)
  32.  
    if isfiltered {
  33.  
    continue
  34.  
    }
  35.  
    new = append(new, item)
  36.  
    }
  37.  
    s.data = new
  38.  
    return s
  39.  
    }
  40.  
     
  41.  
    // 单行处理
  42.  
    type mapfunc[F any] func(F) F
  43.  
     
  44.  
    func (s *Stream[T]) Map(mapFun mapfunc[T]) *Stream[T] {
  45.  
    for idx, item := range s.data {
  46.  
    ret := mapFun(item)
  47.  
    s.data[idx] = ret
  48.  
    }
  49.  
    return s
  50.  
    }
  51.  
     
  52.  
    // 排序
  53.  
    func (s *Stream[T]) SortByNum(key string) *Stream[T] {
  54.  
    s.sortByNum = key
  55.  
    if len(s.sortByStr) > 0 {
  56.  
    s.sortByStr = nil
  57.  
    }
  58.  
    return s
  59.  
    }
  60.  
     
  61.  
    // 每次排序只能使用一种排
  62.  
    func (s *Stream[T]) SortByStr(keys ...string) *Stream[T] {
  63.  
    s.sortByStr = keys
  64.  
    if s.sortByNum != "" {
  65.  
    s.sortByNum = ""
  66.  
    }
  67.  
    return s
  68.  
    }
  69.  
     
  70.  
    func (s *Stream[T]) Sort(esc bool) *Stream[T] {
  71.  
    if s.sortByNum == "" && len(s.sortByStr) == 0 {
  72.  
    log.Println("please call SortBy() before sort()")
  73.  
    return s
  74.  
    }
  75.  
    if s.sortByNum != "" {
  76.  
    sort.Slice(s.data, func(i, j int) bool {
  77.  
    v := reflect.ValueOf(s.data[i]).Elem()
  78.  
    field := v.FieldByName(s.sortByNum)
  79.  
    if !field.IsValid() {
  80.  
    log.Panicf("field=%s not valid", s.sortByNum)
  81.  
    }
  82.  
    idata := fmt.Sprintf("%v", field.Interface())
  83.  
    num, err := strconv.ParseInt(idata, 10, 64)
  84.  
    if err != nil {
  85.  
    log.Panic("please use num when use sortByNum", idata)
  86.  
    }
  87.  
     
  88.  
    v1 := reflect.ValueOf(s.data[j]).Elem()
  89.  
    field1 := v1.FieldByName(s.sortByNum)
  90.  
    if !field1.IsValid() {
  91.  
    log.Panicf("field=%s not valid", s.sortByNum)
  92.  
    }
  93.  
    jdata := fmt.Sprintf("%v", field1.Interface())
  94.  
    num1, err := strconv.ParseInt(jdata, 10, 64)
  95.  
    if err != nil {
  96.  
    log.Panic("please use num when use sortByNum")
  97.  
    }
  98.  
    if esc {
  99.  
    return num < num1
  100.  
    } else {
  101.  
    return num > num1
  102.  
    }
  103.  
     
  104.  
    })
  105.  
    }
  106.  
     
  107.  
    if len(s.sortByStr) > 0 {
  108.  
    sort.Slice(s.data, func(i, j int) bool {
  109.  
    var ifinalv, jfinalv string
  110.  
    for _, key := range s.sortByStr {
  111.  
    v := reflect.ValueOf(s.data[i]).Elem()
  112.  
     
  113.  
    field := v.FieldByName(key)
  114.  
    if !field.IsValid() {
  115.  
    log.Panicf("field=%s not valid", key)
  116.  
    }
  117.  
    idata := fmt.Sprintf("%v", field.Interface())
  118.  
    ifinalv = ifinalv idata
  119.  
    }
  120.  
     
  121.  
    for _, key := range s.sortByStr {
  122.  
    v := reflect.ValueOf(s.data[j]).Elem()
  123.  
     
  124.  
    field := v.FieldByName(key)
  125.  
    if !field.IsValid() {
  126.  
    log.Panicf("field=%s not valid", key)
  127.  
    }
  128.  
    jdata := fmt.Sprintf("%v", field.Interface())
  129.  
    jfinalv = jfinalv jdata
  130.  
    }
  131.  
    // i 大于j的话 返回1 所以正序需要返回false
  132.  
    ret := strings.Compare(ifinalv, jfinalv)
  133.  
    if esc {
  134.  
    return ret < 0
  135.  
    }
  136.  
    return ret >= 0
  137.  
    })
  138.  
    }
  139.  
    return s
  140.  
    }
  141.  
     
  142.  
    // 设置聚合的key
  143.  
    func (s *Stream[T]) KeyBy(key string) *Stream[T] {
  144.  
    s.keyBy = key
  145.  
     
  146.  
    return s
  147.  
    }
  148.  
     
  149.  
    // reduce
  150.  
    // 暂时木有办法改变输出的结构
  151.  
    type reducefunc[F any] func([]F) F
  152.  
     
  153.  
    func (s *Stream[T]) Reduce(reduceFun reducefunc[T]) *Stream[T] {
  154.  
    if s.keyBy == "" {
  155.  
    log.Fatal("please call keyby() before reduce()")
  156.  
    return nil
  157.  
    }
  158.  
    var cache = make(map[string][]T)
  159.  
    defer func() {
  160.  
    cache = nil
  161.  
    }()
  162.  
    for _, item := range s.data {
  163.  
    v := reflect.ValueOf(item).Elem()
  164.  
    field := v.FieldByName(s.keyBy)
  165.  
    key := field.String()
  166.  
    lis, ok := cache[key]
  167.  
    if !ok {
  168.  
    lis = make([]T, 0)
  169.  
    }
  170.  
    lis = append(lis, item)
  171.  
    cache[key] = lis
  172.  
    }
  173.  
    var new []T
  174.  
    for _, lis := range cache {
  175.  
    ret := reduceFun(lis)
  176.  
    new = append(new, ret)
  177.  
    }
  178.  
    s.data = new
  179.  
    return s
  180.  
    }
  181.  
     
  182.  
    // 返回个数
  183.  
    func (s *Stream[T]) Limit(n int) []T {
  184.  
    if n > len(s.data) {
  185.  
    n = len(s.data)
  186.  
    }
  187.  
    return s.data[0:n]
  188.  
    }
  189.  
     
  190.  
    func (s *Stream[T]) Print() {
  191.  
    for idx, item := range s.data {
  192.  
    log.Printf("idx=%d val=%v", idx, item)
  193.  
    }
  194.  
    }
  195.  
     
  196.  
    func (s *Stream[T]) Result() []T {
  197.  
    return s.data
  198.  
    }
学新通

测试例子

  1.  
    func TestTostream(t *testing.T) {
  2.  
    FromElement([]*Student{
  3.  
    &Student{"xyf", "数学", 101},
  4.  
    &Student{"xyf", "语文", 108},
  5.  
    &Student{"xyf", "外语", 101},
  6.  
    }).Map(func(st *Student) *Student {
  7.  
    st.Score = st.Score 10
  8.  
    return st
  9.  
    }).Filter(func(st *Student) bool {
  10.  
    return st.Name == "xyf"
  11.  
    }).
  12.  
    // SortByStr("Name", "Subject").
  13.  
    SortByNum("Score").
  14.  
    Sort(false).
  15.  
    KeyBy("Name").
  16.  
    Reduce(func(st []*Student) *Student {
  17.  
    var ret = &Student{
  18.  
    Name: st[0].Name,
  19.  
    Subject: "all",
  20.  
    }
  21.  
    for _, item := range st {
  22.  
    ret.Score = ret.Score item.Score
  23.  
    }
  24.  
    return ret
  25.  
    }).
  26.  
    Print()
  27.  
    }
学新通

 缺点:golang有点挫的在于不能在方法里面返回新的泛型类型,比如从student返回一个int类型。虽然能通过在struct定义俩个类型 但是万一要生成第三种类型就无能为力了,不可能一直往后加类型吧(这会导致定义类型超级长 写起来超级丑)。

2.通过函数的方式实现(简单举个例子)

  1.  
    type StreamV2[T any] struct {
  2.  
    data []T
  3.  
    }
  4.  
     
  5.  
    func (s StreamV2[T]) Print() {
  6.  
    for i, item := range s.data {
  7.  
    log.Println("idx=", i, " value=", item)
  8.  
    }
  9.  
    }
  10.  
     
  11.  
    func FromElementV2[T any](data []T) Stream[T] {
  12.  
    return Stream[T]{
  13.  
    data: data,
  14.  
    }
  15.  
    }
  16.  
     
  17.  
    func Map[T any, K any](source Stream[T], mapfunc func(data T) K) StreamV2[K] {
  18.  
    var ret []K
  19.  
    for _, item := range source.data {
  20.  
    ret1 := mapfunc(item)
  21.  
    ret = append(ret, ret1)
  22.  
    }
  23.  
    return StreamV2[K]{
  24.  
    data: ret,
  25.  
    }
  26.  
    }
学新通

测试

  1.  
    func TestTostreamv2(t *testing.T) {
  2.  
    stream1 := FromElementV2([]*Student{
  3.  
    &Student{"xyf", "数学", 101},
  4.  
    &Student{"xyf", "语文", 108},
  5.  
    })
  6.  
    stream2 := Map(stream1, func(f *Student) int {
  7.  
    return f.Score
  8.  
    })
  9.  
    stream2.Print()
  10.  
    }

优缺点:这种方式能够将一种容器类型转化为另一种。缺点就是写过java的会吐血(因为搞大数据的朋友都喜欢使用类似builder模式的写法)

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbgkba
系列文章
更多 icon
同类精品
更多 icon
继续加载