前言

在看文章前可以先去 https://zznq.hyuga.icu 体验下。

之前写的代码太拉自己都看不下去一直想重构来着,正好瞅见了 0成本快速创建高匿HTTP/DNS LOG 文章中总结 Hyuga 的优缺点:

Untitled

一次性给它改进下,打造一款:the best tools for monitoring out-of-band traffic.

Hyuga Pro

功能预览

Untitled

移除中间件

这次首先移除中间件redis,持久化采用 leveldb,而用户的oob record存入带有超时缓存cache的 lru 中,代码如下:

type Recorder struct {
	pool     *cache.Cache
	eventbus *event.EventBus
}

func NewRecorder(eventbus *event.EventBus) *Recorder {
	c := cache.New(defaultCacheExpiration, defaultCacheCleanup)
	c.OnEvicted(func(key string, v any) {
		logrus.Debugf("[db][recorder] key:%s deleted", key)
		l, ok := v.(*lru.Cache[int, any])
		if ok {
			putlru(l)
		}
	})

	return &Recorder{
		pool:     c,
		eventbus: eventbus,
	}
}

func (r *Recorder) Record(userid string, v any) error {
	if r.eventbus != nil {
		r.eventbus.Publish(userid, v)
	}

	lru_, ok := r.pool.Get(userid)
	if !ok {
		l := getlru()
		l.Add(0, v)
		r.pool.Set(userid, l, cache.DefaultExpiration)
		return nil
	} else {
		l := lru_.(*lru.Cache[int, any])
		key := l.Len()
		if key >= defaultCacheSize {
			key = l.Keys()[l.Len()-1]
			key++
		}
		l.Add(key, v)
		r.pool.Set(userid, l, cache.DefaultExpiration)
	}

	return nil
}

func (r *Recorder) Get(userid string) ([]any, error) {
	lru_, ok := r.pool.Get(userid)
	if !ok {
		return nil, nil
	}
	l := lru_.(*lru.Cache[int, any])
	if l.Len() == 0 {
		return nil, nil
	}

	res := make([]any, 0, l.Len())
	for _, key := range l.Keys() {
		v, ok := l.Get(key)
		if ok {
			res = append(res, v)
		}
	}
	return res, nil
}

消息推送

实现个简单 eventbus 通过websocket推送到用户页面,也支持第三方应用的消息推送,使用 https://github.com/moonD4rk/notifier 库实现,在服务启动时订阅全部主题,读取用户配置以此推送到第三方应用:

g.Go(func() error {
		// subscribe all event
		s := eventbus.Subscribe("*")
		defer eventbus.Unsubscribe(s)
		for {
			select {
			case <-ctx.Done():
				logrus.Info("[server][notify] shutdown")
				return ctx.Err()
			case msg := <-s.Out():
				r, ok := msg.(oob.Record)
				if !ok {
					continue
				}
				logrus.Infof("[server][notify] eventbus '*' receive msg'%s', '%s'", r.Type.String(), r.Name)

				u, err := db.GetUserBySid(r.Sid)
				if err == nil && u != nil && u.Notify.Enable {
					if u.Notify.Bark.Key != "" {
						notifier.WithBark(u.Notify.Bark.Key, u.Notify.Bark.Server, r.Type.String(), r.Name)
					}
					...
				}
			}
		}
	})

websocket 那里只要订阅用户自己的消息:

func (w *restfulHandler) record(c *gin.Context) {
	ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		logrus.Warnf("[restful] upgrade websocket failed: %v", err)
		return
	}

	sid := c.GetString("sid")
	go func() {
		defer ws.Close()

		logrus.Infof("[restful] start record stream")
		c := make(chan struct{})
		go func() {
			if _, _, err := ws.ReadMessage(); err != nil {
				close(c)
			}
		}()
		// get user all records
		records, err := w.recorder.Get(sid)
		if err != nil {
			logrus.Warnf("[restful] get user records err: %s", err.Error())
			return
		}
		for _, r := range records {
			if err = ws.WriteJSON(r); err != nil {
				logrus.Infof("[restful][stream] push record err: %s", err.Error())
			}
		}
		// subscribe user record event
		s := w.eventbus.Subscribe(sid)
		defer w.eventbus.Unsubscribe(s)
		for {
			select {
			case <-c:
				logrus.Infof("[restful] close record stream")
				return
			case msg := <-s.Out():
				logrus.Infof("[restful][stream] push record msg: %v", msg)
				if err = ws.WriteJSON(msg); err != nil {
					return
				}
			}
		}
	}()
}

部署

Hyuga 通过github action 自动发布,使用 embed 打包好前端资源文件,所以部署特别简单:

  1. 准备一个域名,如 hyuga.icu,准备一台vps,如公网ip 1.1.1.1
Powered by Kali-Team