728x90
반응형

echo 프레임워크로 파일 업로더 서버를 구축했다.

 

쿡북에서 단일 파일, 여러 파일을 업로드하는 예제가 있어 이를 바탕으로 쉽게 만들 수 있었다.

 

하지만 업로드를 하는 클라이언트에서 폼의 키값을 다른 것으로 하는 경우를 고려해야 했다.

 

그래서 여러 파일을 업로드하는 코드를 참고하여 입맛에 맞게 수정했다.

func upload(c echo.Context) error {
    form, err := c.MultipartForm()
    if err != nil {
        return err
    }

    for _, file := range form.File {
        // Source
        src, err := file[0].Open()
        if err != nil {
            return err
        }
        defer src.Close()

        // Destination
        dst, err := os.Create(file[0].Filename)
        if err != nil {
            return err
        }
        defer dst.Close()

        // Copy
        if _, err = io.Copy(dst, src); err != nil {
            return err
        }
    }
}

그리고 용량이 큰 파일을 업로드하는 경우에 디스크 사용량이 파일 사이즈의 2배가 늘어났다.

 

원인을 찾아보니 go에서 설정한 메모리(32 MB + 10MB)보다 큰 경우 /tmp/에 파일을 저장하고 있었다.

 

다음과 같이 하면 파일을 원하는 곳에 복사한 다음 임시 파일을 모두 제거할 수 있다.

func upload(c echo.Context) error {
    defer func() {
        form, err := ctx.MultipartForm()
	    if err != nil {
    	    return
        }
    	form.RemoveAll()
    }()
    ...
}

 

참고 문헌

  1. https://echo.labstack.com/cookbook/file-upload
  2. https://github.com/labstack/echo/blob/master/context.go#L369
  3. https://github.com/golang/go/blob/master/src/mime/multipart/formdata.go#L86
반응형
728x90
반응형

신규 서버를 만들면서 두가지 기능이 필요했다.

 

첫째는 명령어를 동시에 실행해서 속도를 높이는 것과 데이터가 있는 디렉토리를 제거하는 것이었다.

  1. 명령어 동시 실행하기

Golang에서 WaitGroup은 모든 goroutine이 종료될 때까지 기다린다.

 

채널을 통해서 커맨드를 전달하고 goroutine에서 이 커맨드를 실행하고 종료한다.

 

명령어를 단순 반복문으로 수행하면 오래 걸리지만 이를 활용하면 빠르게 수행할 수 있었다.

tasks := make(chan *exec.Cmd, 64)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(w *sync.WaitGroup) {
        defer w.Done()
        for cmd := range tasks {
            out, err := cmd.Output()
            if err != nil {
                fmt.Println("can't get stdout: %v", err)
            }
            fmt.Println(string(out))
        }
    }(&wg)
}
for i := 0; i < 10; i++ {
    tasks <- exec.Command("echo", strconv.Itoa(i))
}
close(tasks)
wg.Wait()
  1. 디렉토리 제거(rm -rf)

간단하게 exec.Command를 활용하면 되겠지만 최대한 Golang 함수로 해결해보려고 했다.

 

그래서 아래와 같이 디렉토리 밑의 데이터를 지우고 마지막으로 디렉토리를 삭제하도록 했다.

func removeContents(dir string) error {
    d, err := os.Open(dir)
    if err != nil {
        return err
    }
    defer d.Close()
    names, err := d.Readdirnames(-1)
    if err != nil {
        return err
    }
    for _, name := range names {
        err = os.RemoveAll(filepath.Join(dir, name))
        if err != nil {
            return err
        }
    }
    syscall.Rmdir(dir)
    return nil
}

 

참고 문헌

  1. https://stackoverflow.com/questions/40247726/go-execute-a-bash-command-n-times-using-goroutines-and-store-print-its-resul

  2. https://stackoverflow.com/questions/33450980/how-to-remove-all-contents-of-a-directory-using-golang

반응형
728x90
반응형
  1. 각 패키지는 단일 목적을 수행하라

  2. 명시적으로 에러를 다뤄라

  3. 깊게 중첩하는 것보다 빠르게 반환하라

  4. 호출자에게 동시성을 맡겨라

  5. goroutine을 실행하기 전, 언제 멈출지 알라

  6. 패키지 수준의 상태를 피하라

  7. 단순함은 중요하다

  8. 패키지 API의 제약을 위해 테스트 코드를 작성하라

  9. 느리다고 생각되면 우선 벤치마크로 증명하라

  10. 중용은 미덕이다

  11. 유지 보수를 생각하라

 

참고 문헌

  1. https://the-zen-of-go.netlify.app
반응형
728x90
반응형

Go로 만든 프로젝트에서 kafka 연동하여 로그를 수집하는데 confluent-kafka-go를 사용하고 있었다.

package main

import (
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

var kafkaClient *kafka.Producer

func main() {
    kafkaClient, _ = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "broker:9092",
        "acks": 1,
    })

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                m := ev
                if m.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v, queue: %d\n", m.TopicPartition.Error, p.Len())
                }
                return
            default:
                fmt.Printf("Ignored event: %s\n", ev)
            }
        }
    }()
}

func SendMessage(topic string, message string) {
    kafkaClient.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(message)}
}

테스트를 했을 때 잘 동작하여 아무 문제가 없었지만 며칠간 실행시켜보니 Local: Queue full 에러가 나면서 더이상 메시지를 전송하지 못했다.

queue.buffering.max.messages 크기를 조절해가면서 테스트를 했지만 해당 에러가 계속 발생해 Flush()로 급한 불을 끌 수 있었다.

librdkafka에서 쓰는 버퍼가 해제되지 않은 문제인 듯 했으나 정확한 원인을 몰라 더 이상의 사용이 불안하여 다른 라이브러리를 찾아봤다.

Shopify에서 만든 sarama가 유명하다고 추천받아 테스트를 해보고 적용하기로 했다.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

var kafkaClient sarama.AsyncProducer

func main() {
    brokers := []string{"broker:9092"}
    kafkaClient, _ = sarama.NewAsyncProducer(brokers, nil)
}

func SendMessage(topic string, message string) {
    kafkaClient.Input() <- &sarama.ProducerMessage{
        Topic: topic,
        Partition: -1,
        Value: sarama.StringEncoder(message),
    }
}

테스트는 1000만건의 메시지를 전송하도록 진행했다.

 

confluent-kafka-go와 비교해보니 메모리를 더 적게 사용하면서 성능은 비슷했다.

 

confluent-kafka-go는 메모리를 20MB 정도 기본으로 사용하고 sarma는 최대 3MB를 사용했다.

 

confluent-kafka-go의 경우 Flush를 하지 않으면 GC를 수행해도 메모리가 조금씩 증가했지만

 

sarama는 메모리가 증가하는 것 없이 동작을 잘 하는 것을 확인할 수 있었다.

 

참고 문헌

  1. https://github.com/confluentinc/confluent-kafka-go

  2. https://github.com/Shopify/sarama

반응형

+ Recent posts