mirror of
https://github.com/zeromicro/go-zero.git
synced 2026-05-27 00:25:29 +08:00
Compare commits
173 Commits
tools/goct
...
v1.3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78ea0769fd | ||
|
|
e0fa8d820d | ||
|
|
dfd58c213c | ||
|
|
83cacf51b7 | ||
|
|
6dccfa29fd | ||
|
|
7e0b0ab0b1 | ||
|
|
ac18cc470d | ||
|
|
f4471846ff | ||
|
|
9c2d526a11 | ||
|
|
2b9fc26c38 | ||
|
|
321dc2d410 | ||
|
|
500bd87c85 | ||
|
|
e9620c8c05 | ||
|
|
70e51bb352 | ||
|
|
278cd123c8 | ||
|
|
3febb1a5d0 | ||
|
|
d8054d8def | ||
|
|
ec271db7a0 | ||
|
|
bbac994c8a | ||
|
|
c1d9e6a00b | ||
|
|
0aeb49a6b0 | ||
|
|
fe262766b4 | ||
|
|
7181505c8a | ||
|
|
f060a226bc | ||
|
|
93d524b797 | ||
|
|
5c169f4f49 | ||
|
|
d29dfa12e3 | ||
|
|
194f55e08e | ||
|
|
c0f9892fe3 | ||
|
|
227104d7d7 | ||
|
|
448029aa4b | ||
|
|
17e0afeac0 | ||
|
|
18916b5189 | ||
|
|
c11a09be23 | ||
|
|
56e1ecf2f3 | ||
|
|
f9e6013a6c | ||
|
|
b5d1d8b0d1 | ||
|
|
09e6d94f9e | ||
|
|
2a5717d7fb | ||
|
|
85cf662c6f | ||
|
|
3279a7ef0f | ||
|
|
fec908a19b | ||
|
|
f5ed0cda58 | ||
|
|
cc9d16f505 | ||
|
|
c05d74b44c | ||
|
|
32c88b6352 | ||
|
|
7dabec260f | ||
|
|
4feb88f9b5 | ||
|
|
2776caed0e | ||
|
|
c55694d957 | ||
|
|
209ffb934b | ||
|
|
26a33932cd | ||
|
|
d6a692971f | ||
|
|
4624390e54 | ||
|
|
63b7d292c1 | ||
|
|
365c569d7c | ||
|
|
68a81fea8a | ||
|
|
08a8bd7ef7 | ||
|
|
b939ce75ba | ||
|
|
3b7ca86e4f | ||
|
|
60760b52ab | ||
|
|
96c128c58a | ||
|
|
0c35f39a7d | ||
|
|
6a66dde0a1 | ||
|
|
36b9fcba44 | ||
|
|
bf99dda620 | ||
|
|
511dfcb409 | ||
|
|
900bc96420 | ||
|
|
be277a7376 | ||
|
|
f15a4f9188 | ||
|
|
e31128650e | ||
|
|
168740b64d | ||
|
|
cc4c4928e0 | ||
|
|
fba6543b23 | ||
|
|
877eb6ac56 | ||
|
|
259a5a13e7 | ||
|
|
cf7c7cb392 | ||
|
|
86d01e2e99 | ||
|
|
7a28e19a27 | ||
|
|
900ea63d68 | ||
|
|
87ab86cdd0 | ||
|
|
0697494ffd | ||
|
|
ffd69a2f5e | ||
|
|
66f10bb5e6 | ||
|
|
8131a0e777 | ||
|
|
32a557dff6 | ||
|
|
db949e40f1 | ||
|
|
e0454138e0 | ||
|
|
3b07ed1b97 | ||
|
|
daa98f5a27 | ||
|
|
842656aa90 | ||
|
|
aa29036cb3 | ||
|
|
607bae27fa | ||
|
|
7c63676be4 | ||
|
|
9e113909b3 | ||
|
|
bd105474ca | ||
|
|
a078f5d764 | ||
|
|
b215fa3ee6 | ||
|
|
50b1928502 | ||
|
|
493e3bcf4b | ||
|
|
6deb80625d | ||
|
|
6ab051568c | ||
|
|
2732d3cdae | ||
|
|
e8c307e4dc | ||
|
|
84ddc660c4 | ||
|
|
e60e707955 | ||
|
|
cf4321b2d0 | ||
|
|
1993faf2f8 | ||
|
|
0ce85376bf | ||
|
|
a40254156f | ||
|
|
05cc62f5ff | ||
|
|
9c2c90e533 | ||
|
|
822ee2e1c5 | ||
|
|
77482c8946 | ||
|
|
7ef0ab3119 | ||
|
|
8bd89a297a | ||
|
|
bb75cc796e | ||
|
|
0fdd8f54eb | ||
|
|
b1ffc464cd | ||
|
|
50174960e4 | ||
|
|
8f46eab977 | ||
|
|
ec299085f5 | ||
|
|
7727d70634 | ||
|
|
5f9d101bc6 | ||
|
|
6c2abe7474 | ||
|
|
14a902c1a7 | ||
|
|
5ad6a6d229 | ||
|
|
6f4b97864a | ||
|
|
0e0abc3a95 | ||
|
|
696fda1db4 | ||
|
|
c1d2634427 | ||
|
|
4b7a680ac5 | ||
|
|
b3e7d2901f | ||
|
|
cdf7ec213c | ||
|
|
f1102fb262 | ||
|
|
09d1fad6e0 | ||
|
|
379c65a3ef | ||
|
|
fdc7f64d6f | ||
|
|
df0f8ed59e | ||
|
|
c903966fc7 | ||
|
|
e57fa8ff53 | ||
|
|
bf2feee5b7 | ||
|
|
ce05c429fc | ||
|
|
272a3f347d | ||
|
|
13db7a1931 | ||
|
|
468c237189 | ||
|
|
b9b80c068b | ||
|
|
9b592b3dee | ||
|
|
2203809e5e | ||
|
|
8d6d37f71e | ||
|
|
ea4f2af67f | ||
|
|
53af194ef9 | ||
|
|
5e0e2d2b14 | ||
|
|
74c99184c5 | ||
|
|
eb4b86137a | ||
|
|
9c4f4f3b4e | ||
|
|
240132e7c7 | ||
|
|
9d67fc4cfb | ||
|
|
892f93a716 | ||
|
|
ba6a7c9dc8 | ||
|
|
a91c3907a8 | ||
|
|
e267d94ee1 | ||
|
|
89ce5e492b | ||
|
|
290de6aa96 | ||
|
|
a7aeb8ac0e | ||
|
|
a8e7fafebf | ||
|
|
7cc64070b1 | ||
|
|
c19d2637ea | ||
|
|
fe1da14332 | ||
|
|
8e9110cedf | ||
|
|
d6ff30a570 | ||
|
|
b98d46bfd6 | ||
|
|
768936b256 |
60
.github/workflows/go.yml
vendored
60
.github/workflows/go.yml
vendored
@@ -7,32 +7,50 @@ on:
|
|||||||
branches: [ master ]
|
branches: [ master ]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
test-linux:
|
||||||
name: Build
|
name: Linux
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
|
- name: Set up Go 1.x
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: ^1.15
|
||||||
|
id: go
|
||||||
|
|
||||||
- name: Set up Go 1.x
|
- name: Check out code into the Go module directory
|
||||||
uses: actions/setup-go@v2
|
uses: actions/checkout@v2
|
||||||
with:
|
|
||||||
go-version: ^1.14
|
|
||||||
id: go
|
|
||||||
|
|
||||||
- name: Check out code into the Go module directory
|
- name: Get dependencies
|
||||||
uses: actions/checkout@v2
|
run: |
|
||||||
|
go get -v -t -d ./...
|
||||||
|
|
||||||
- name: Get dependencies
|
- name: Lint
|
||||||
run: |
|
run: |
|
||||||
go get -v -t -d ./...
|
go vet -stdmethods=false $(go list ./...)
|
||||||
|
go install mvdan.cc/gofumpt@latest
|
||||||
|
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
|
||||||
|
|
||||||
- name: Lint
|
- name: Test
|
||||||
run: |
|
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
|
||||||
go vet -stdmethods=false $(go list ./...)
|
|
||||||
go install mvdan.cc/gofumpt@latest
|
|
||||||
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
|
|
||||||
|
|
||||||
- name: Test
|
- name: Codecov
|
||||||
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
|
uses: codecov/codecov-action@v2
|
||||||
|
|
||||||
- name: Codecov
|
test-win:
|
||||||
uses: codecov/codecov-action@v2
|
name: Windows
|
||||||
|
runs-on: windows-latest
|
||||||
|
steps:
|
||||||
|
- name: Set up Go 1.x
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: ^1.15
|
||||||
|
|
||||||
|
- name: Checkout codebase
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Test
|
||||||
|
run: |
|
||||||
|
go mod verify
|
||||||
|
go mod download
|
||||||
|
go test -v -race ./...
|
||||||
|
cd tools/goctl && go build -v goctl.go
|
||||||
|
|||||||
18
.github/workflows/issue-translator.yml
vendored
Normal file
18
.github/workflows/issue-translator.yml
vendored
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
name: 'issue-translator'
|
||||||
|
on:
|
||||||
|
issue_comment:
|
||||||
|
types: [created]
|
||||||
|
issues:
|
||||||
|
types: [opened]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: tomsun28/issues-translate-action@v2.6
|
||||||
|
with:
|
||||||
|
IS_MODIFY_TITLE: true
|
||||||
|
# not require, default false, . Decide whether to modify the issue title
|
||||||
|
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
|
||||||
|
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑🤝🧑👫🧑🏿🤝🧑🏻👩🏾🤝👨🏿👬🏿
|
||||||
|
# not require. Customize the translation robot prefix message.
|
||||||
30
.github/workflows/release.yaml
vendored
30
.github/workflows/release.yaml
vendored
@@ -1,30 +0,0 @@
|
|||||||
on:
|
|
||||||
release:
|
|
||||||
types: [created]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
releases-matrix:
|
|
||||||
name: Release goctl binary
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
# build and publish in parallel: linux/386, linux/amd64, linux/arm64,
|
|
||||||
# windows/386, windows/amd64, windows/arm64, darwin/amd64, darwin/arm64
|
|
||||||
goos: [linux, windows, darwin]
|
|
||||||
goarch: ["386", amd64, arm64]
|
|
||||||
exclude:
|
|
||||||
- goarch: "386"
|
|
||||||
goos: darwin
|
|
||||||
- goarch: "386"
|
|
||||||
goos: windows
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
- uses: wangyoucao577/go-release-action@v1.22
|
|
||||||
with:
|
|
||||||
github_token: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
goos: ${{ matrix.goos }}
|
|
||||||
goarch: ${{ matrix.goarch }}
|
|
||||||
goversion: "https://dl.google.com/go/go1.17.5.linux-amd64.tar.gz"
|
|
||||||
project_path: "tools/goctl"
|
|
||||||
binary_name: "goctl"
|
|
||||||
extra_files: tools/goctl/goctl.md
|
|
||||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -16,7 +16,8 @@
|
|||||||
**/logs
|
**/logs
|
||||||
|
|
||||||
# for test purpose
|
# for test purpose
|
||||||
adhoc
|
**/adhoc
|
||||||
|
**/testdata
|
||||||
|
|
||||||
# gitlab ci
|
# gitlab ci
|
||||||
.cache
|
.cache
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
|
|||||||
getting your work reviewed and merged.
|
getting your work reviewed and merged.
|
||||||
|
|
||||||
If you have questions about the development process,
|
If you have questions about the development process,
|
||||||
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
|
feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose).
|
||||||
|
|
||||||
## Find something to work on
|
## Find something to work on
|
||||||
|
|
||||||
@@ -50,10 +50,10 @@ Here is how you get started.
|
|||||||
|
|
||||||
### Find a good first topic
|
### Find a good first topic
|
||||||
|
|
||||||
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
|
[go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue.
|
||||||
For example, [go-zero](https://github.com/tal-tech/go-zero) has
|
For example, [go-zero](https://github.com/zeromicro/go-zero) has
|
||||||
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
|
[help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
|
||||||
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
|
[good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
|
||||||
labels for issues that should not need deep knowledge of the system.
|
labels for issues that should not need deep knowledge of the system.
|
||||||
We can help new contributors who wish to work on such issues.
|
We can help new contributors who wish to work on such issues.
|
||||||
|
|
||||||
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
|
|||||||
- Create a topic branch from where to base the contribution. This is usually master.
|
- Create a topic branch from where to base the contribution. This is usually master.
|
||||||
- Make commits of logical units.
|
- Make commits of logical units.
|
||||||
- Push changes in a topic branch to a personal fork of the repository.
|
- Push changes in a topic branch to a personal fork of the repository.
|
||||||
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
|
- Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero).
|
||||||
|
|
||||||
## Creating Pull Requests
|
## Creating Pull Requests
|
||||||
|
|
||||||
|
|||||||
2
LICENSE
2
LICENSE
@@ -1,6 +1,6 @@
|
|||||||
MIT License
|
MIT License
|
||||||
|
|
||||||
Copyright (c) 2020 xiaoheiban_server_go
|
Copyright (c) 2022 zeromicro
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
|||||||
12
ROADMAP.md
12
ROADMAP.md
@@ -14,13 +14,15 @@ We hope that the items listed below will inspire further engagement from the com
|
|||||||
|
|
||||||
## 2021 Q4
|
## 2021 Q4
|
||||||
- [x] Support `username/password` authentication in ETCD
|
- [x] Support `username/password` authentication in ETCD
|
||||||
|
- [x] Support `SSL/TLS` in ETCD
|
||||||
- [x] Support `SSL/TLS` in `zRPC`
|
- [x] Support `SSL/TLS` in `zRPC`
|
||||||
- [x] Support `TLS` in redis connections
|
- [x] Support `TLS` in redis connections
|
||||||
|
- [x] Support `goctl bug` to report bugs conveniently
|
||||||
|
|
||||||
## 2022
|
## 2022
|
||||||
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file
|
- [x] Support `context` in redis related methods for timeout and tracing
|
||||||
- [ ] Add `httpx.Client` with governance, like circuit breaker etc.
|
- [x] Support `context` in sql related methods for timeout and tracing
|
||||||
- [ ] Support `goctl doctor` command to report potential issues for given service
|
|
||||||
- [ ] Support `context` in redis related methods for timeout and tracing
|
|
||||||
- [ ] Support `context` in sql related methods for timeout and tracing
|
|
||||||
- [ ] Support `context` in mongodb related methods for timeout and tracing
|
- [ ] Support `context` in mongodb related methods for timeout and tracing
|
||||||
|
- [x] Add `httpc.Do` with HTTP call governance, like circuit breaker etc.
|
||||||
|
- [ ] Support `goctl doctor` command to report potential issues for given service
|
||||||
|
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/hash"
|
"github.com/zeromicro/go-zero/core/hash"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRedisBitSet_New_Set_Test(t *testing.T) {
|
func TestRedisBitSet_New_Set_Test(t *testing.T) {
|
||||||
|
|||||||
@@ -6,11 +6,11 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -171,7 +171,7 @@ func (lt loggedThrottle) allow() (Promise, error) {
|
|||||||
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
|
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
|
||||||
return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
|
return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
|
||||||
accept := acceptable(err)
|
accept := acceptable(err)
|
||||||
if !accept {
|
if !accept && err != nil {
|
||||||
lt.errWin.add(err.Error())
|
lt.errWin.add(err.Error())
|
||||||
}
|
}
|
||||||
return accept
|
return accept
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -7,9 +7,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,8 +8,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/iox"
|
"github.com/zeromicro/go-zero/core/iox"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestEnterToContinue(t *testing.T) {
|
func TestEnterToContinue(t *testing.T) {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrPaddingSize indicates bad padding size.
|
// ErrPaddingSize indicates bad padding size.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -61,3 +61,41 @@ func TestPutMore(t *testing.T) {
|
|||||||
assert.Equal(t, string(element), string(body.([]byte)))
|
assert.Equal(t, string(element), string(body.([]byte)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutMoreWithHeaderNotZero(t *testing.T) {
|
||||||
|
elements := [][]byte{
|
||||||
|
[]byte("hello"),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("again"),
|
||||||
|
}
|
||||||
|
queue := NewQueue(4)
|
||||||
|
for i := range elements {
|
||||||
|
queue.Put(elements[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
// take 1
|
||||||
|
body, ok := queue.Take()
|
||||||
|
assert.True(t, ok)
|
||||||
|
element, ok := body.([]byte)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, element, []byte("hello"))
|
||||||
|
|
||||||
|
// put more
|
||||||
|
queue.Put([]byte("b4"))
|
||||||
|
queue.Put([]byte("b5")) // will store in elements[0]
|
||||||
|
queue.Put([]byte("b6")) // cause expansion
|
||||||
|
|
||||||
|
results := [][]byte{
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("again"),
|
||||||
|
[]byte("b4"),
|
||||||
|
[]byte("b5"),
|
||||||
|
[]byte("b6"),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, element := range results {
|
||||||
|
body, ok := queue.Take()
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, string(element), string(body.([]byte)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const duration = time.Millisecond * 50
|
const duration = time.Millisecond * 50
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSafeMap(t *testing.T) {
|
func TestSafeMap(t *testing.T) {
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
package collection
|
package collection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -5,9 +5,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const drainWorkers = 8
|
const drainWorkers = 8
|
||||||
|
|||||||
@@ -8,10 +8,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/mapping"
|
"github.com/zeromicro/go-zero/core/mapping"
|
||||||
)
|
)
|
||||||
|
|
||||||
var loaders = map[string]func([]byte, interface{}) error{
|
var loaders = map[string]func([]byte, interface{}) error{
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
"github.com/tal-tech/go-zero/core/hash"
|
"github.com/zeromicro/go-zero/core/hash"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLoadConfig_notExists(t *testing.T) {
|
func TestLoadConfig_notExists(t *testing.T) {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/iox"
|
"github.com/zeromicro/go-zero/core/iox"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PropertyError represents a configuration error message.
|
// PropertyError represents a configuration error message.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProperties(t *testing.T) {
|
func TestProperties(t *testing.T) {
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package contextx
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/mapping"
|
"github.com/zeromicro/go-zero/core/mapping"
|
||||||
)
|
)
|
||||||
|
|
||||||
const contextTagKey = "ctx"
|
const contextTagKey = "ctx"
|
||||||
|
|||||||
@@ -1,7 +1,14 @@
|
|||||||
package discov
|
package discov
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/discov/internal"
|
import "github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
|
|
||||||
|
// RegisterAccount registers the username/password to the given etcd cluster.
|
||||||
func RegisterAccount(endpoints []string, user, pass string) {
|
func RegisterAccount(endpoints []string, user, pass string) {
|
||||||
internal.AddAccount(endpoints, user, pass)
|
internal.AddAccount(endpoints, user, pass)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterTLS registers the CertFile/CertKeyFile/CACertFile to the given etcd.
|
||||||
|
func RegisterTLS(endpoints []string, certFile, certKeyFile, caFile string,
|
||||||
|
insecureSkipVerify bool) error {
|
||||||
|
return internal.AddTLS(endpoints, certFile, certKeyFile, caFile, insecureSkipVerify)
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegisterAccount(t *testing.T) {
|
func TestRegisterAccount(t *testing.T) {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
var mockLock sync.Mutex
|
var mockLock sync.Mutex
|
||||||
|
|||||||
@@ -2,12 +2,23 @@ package discov
|
|||||||
|
|
||||||
import "errors"
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
// errEmptyEtcdHosts indicates that etcd hosts are empty.
|
||||||
|
errEmptyEtcdHosts = errors.New("empty etcd hosts")
|
||||||
|
// errEmptyEtcdKey indicates that etcd key is empty.
|
||||||
|
errEmptyEtcdKey = errors.New("empty etcd key")
|
||||||
|
)
|
||||||
|
|
||||||
// EtcdConf is the config item with the given key on etcd.
|
// EtcdConf is the config item with the given key on etcd.
|
||||||
type EtcdConf struct {
|
type EtcdConf struct {
|
||||||
Hosts []string
|
Hosts []string
|
||||||
Key string
|
Key string
|
||||||
User string `json:",optional"`
|
User string `json:",optional"`
|
||||||
Pass string `json:",optional"`
|
Pass string `json:",optional"`
|
||||||
|
CertFile string `json:",optional"`
|
||||||
|
CertKeyFile string `json:",optional=CertFile"`
|
||||||
|
CACertFile string `json:",optional=CertFile"`
|
||||||
|
InsecureSkipVerify bool `json:",optional"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAccount returns if account provided.
|
// HasAccount returns if account provided.
|
||||||
@@ -15,12 +26,17 @@ func (c EtcdConf) HasAccount() bool {
|
|||||||
return len(c.User) > 0 && len(c.Pass) > 0
|
return len(c.User) > 0 && len(c.Pass) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasTLS returns if TLS CertFile/CertKeyFile/CACertFile are provided.
|
||||||
|
func (c EtcdConf) HasTLS() bool {
|
||||||
|
return len(c.CertFile) > 0 && len(c.CertKeyFile) > 0 && len(c.CACertFile) > 0
|
||||||
|
}
|
||||||
|
|
||||||
// Validate validates c.
|
// Validate validates c.
|
||||||
func (c EtcdConf) Validate() error {
|
func (c EtcdConf) Validate() error {
|
||||||
if len(c.Hosts) == 0 {
|
if len(c.Hosts) == 0 {
|
||||||
return errors.New("empty etcd hosts")
|
return errEmptyEtcdHosts
|
||||||
} else if len(c.Key) == 0 {
|
} else if len(c.Key) == 0 {
|
||||||
return errors.New("empty etcd key")
|
return errEmptyEtcdKey
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,17 +1,25 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"io/ioutil"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
accounts = make(map[string]Account)
|
||||||
|
tlsConfigs = make(map[string]*tls.Config)
|
||||||
|
lock sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
// Account holds the username/password for an etcd cluster.
|
||||||
type Account struct {
|
type Account struct {
|
||||||
User string
|
User string
|
||||||
Pass string
|
Pass string
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
// AddAccount adds the username/password for the given etcd cluster.
|
||||||
accounts = make(map[string]Account)
|
|
||||||
lock sync.RWMutex
|
|
||||||
)
|
|
||||||
|
|
||||||
func AddAccount(endpoints []string, user, pass string) {
|
func AddAccount(endpoints []string, user, pass string) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
@@ -22,6 +30,33 @@ func AddAccount(endpoints []string, user, pass string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddTLS adds the tls cert files for the given etcd cluster.
|
||||||
|
func AddTLS(endpoints []string, certFile, certKeyFile, caFile string, insecureSkipVerify bool) error {
|
||||||
|
cert, err := tls.LoadX509KeyPair(certFile, certKeyFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
caData, err := ioutil.ReadFile(caFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := x509.NewCertPool()
|
||||||
|
pool.AppendCertsFromPEM(caData)
|
||||||
|
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
tlsConfigs[getClusterKey(endpoints)] = &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
RootCAs: pool,
|
||||||
|
InsecureSkipVerify: insecureSkipVerify,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAccount gets the username/password for the given etcd cluster.
|
||||||
func GetAccount(endpoints []string) (Account, bool) {
|
func GetAccount(endpoints []string) (Account, bool) {
|
||||||
lock.RLock()
|
lock.RLock()
|
||||||
defer lock.RUnlock()
|
defer lock.RUnlock()
|
||||||
@@ -29,3 +64,12 @@ func GetAccount(endpoints []string) (Account, bool) {
|
|||||||
account, ok := accounts[getClusterKey(endpoints)]
|
account, ok := accounts[getClusterKey(endpoints)]
|
||||||
return account, ok
|
return account, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetTLS gets the tls config for the given etcd cluster.
|
||||||
|
func GetTLS(endpoints []string) (*tls.Config, bool) {
|
||||||
|
lock.RLock()
|
||||||
|
defer lock.RUnlock()
|
||||||
|
|
||||||
|
cfg, ok := tlsConfigs[getClusterKey(endpoints)]
|
||||||
|
return cfg, ok
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAccount(t *testing.T) {
|
func TestAccount(t *testing.T) {
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/contextx"
|
"github.com/zeromicro/go-zero/core/contextx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -337,6 +337,9 @@ func DialClient(endpoints []string) (EtcdClient, error) {
|
|||||||
cfg.Username = account.User
|
cfg.Username = account.User
|
||||||
cfg.Password = account.Pass
|
cfg.Password = account.Pass
|
||||||
}
|
}
|
||||||
|
if tlsCfg, ok := GetTLS(endpoints); ok {
|
||||||
|
cfg.TLS = tlsCfg
|
||||||
|
}
|
||||||
|
|
||||||
return clientv3.New(cfg)
|
return clientv3.New(cfg)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,10 +7,10 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/contextx"
|
"github.com/zeromicro/go-zero/core/contextx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
package discov
|
package discov
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -145,16 +145,23 @@ func (p *Publisher) revoke(cli internal.EtcdClient) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithPubEtcdAccount provides the etcd username/password.
|
|
||||||
func WithPubEtcdAccount(user, pass string) PubOption {
|
|
||||||
return func(pub *Publisher) {
|
|
||||||
internal.AddAccount(pub.endpoints, user, pass)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithId customizes a Publisher with the id.
|
// WithId customizes a Publisher with the id.
|
||||||
func WithId(id int64) PubOption {
|
func WithId(id int64) PubOption {
|
||||||
return func(publisher *Publisher) {
|
return func(publisher *Publisher) {
|
||||||
publisher.id = id
|
publisher.id = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithPubEtcdAccount provides the etcd username/password.
|
||||||
|
func WithPubEtcdAccount(user, pass string) PubOption {
|
||||||
|
return func(pub *Publisher) {
|
||||||
|
RegisterAccount(pub.endpoints, user, pass)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
|
||||||
|
func WithPubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) PubOption {
|
||||||
|
return func(pub *Publisher) {
|
||||||
|
logx.Must(RegisterTLS(pub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -8,10 +8,10 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -4,8 +4,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@@ -58,9 +59,17 @@ func Exclusive() SubOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithSubEtcdAccount provides the etcd username/password.
|
||||||
func WithSubEtcdAccount(user, pass string) SubOption {
|
func WithSubEtcdAccount(user, pass string) SubOption {
|
||||||
return func(sub *Subscriber) {
|
return func(sub *Subscriber) {
|
||||||
internal.AddAccount(sub.endpoints, user, pass)
|
RegisterAccount(sub.endpoints, user, pass)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
|
||||||
|
func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) SubOption {
|
||||||
|
return func(sub *Subscriber) {
|
||||||
|
logx.Must(RegisterTLS(sub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -11,10 +11,12 @@ type (
|
|||||||
errorArray []error
|
errorArray []error
|
||||||
)
|
)
|
||||||
|
|
||||||
// Add adds err to be.
|
// Add adds errs to be, nil errors are ignored.
|
||||||
func (be *BatchError) Add(err error) {
|
func (be *BatchError) Add(errs ...error) {
|
||||||
if err != nil {
|
for _, err := range errs {
|
||||||
be.errs = append(be.errs, err)
|
if err != nil {
|
||||||
|
be.errs = append(be.errs, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A DelayExecutor delays a tasks on given delay interval.
|
// A DelayExecutor delays a tasks on given delay interval.
|
||||||
|
|||||||
@@ -3,8 +3,8 @@ package executors
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A LessExecutor is an executor to limit execution once within given time interval.
|
// A LessExecutor is an executor to limit execution once within given time interval.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLessExecutor_DoOrDiscard(t *testing.T) {
|
func TestLessExecutor_DoOrDiscard(t *testing.T) {
|
||||||
|
|||||||
@@ -6,11 +6,11 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const idleRound = 10
|
const idleRound = 10
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const threshold = 10
|
const threshold = 10
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSplitLineChunks(t *testing.T) {
|
func TestSplitLineChunks(t *testing.T) {
|
||||||
|
|||||||
@@ -5,6 +5,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// errExceedFileSize indicates that the file size is exceeded.
|
||||||
|
var errExceedFileSize = errors.New("exceed file size")
|
||||||
|
|
||||||
// A RangeReader is used to read a range of content from a file.
|
// A RangeReader is used to read a range of content from a file.
|
||||||
type RangeReader struct {
|
type RangeReader struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
@@ -29,7 +32,7 @@ func (rr *RangeReader) Read(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if rr.stop < rr.start || rr.start >= stat.Size() {
|
if rr.stop < rr.start || rr.start >= stat.Size() {
|
||||||
return 0, errors.New("exceed file size")
|
return 0, errExceedFileSize
|
||||||
}
|
}
|
||||||
|
|
||||||
if rr.stop-rr.start < int64(len(p)) {
|
if rr.stop-rr.start < int64(len(p)) {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRangeReader(t *testing.T) {
|
func TestRangeReader(t *testing.T) {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/hash"
|
"github.com/zeromicro/go-zero/core/hash"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TempFileWithText creates the temporary file with the given content,
|
// TempFileWithText creates the temporary file with the given content,
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package fx
|
package fx
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/threading"
|
import "github.com/zeromicro/go-zero/core/threading"
|
||||||
|
|
||||||
// Parallel runs fns parallelly and waits for done.
|
// Parallel runs fns parallelly and waits for done.
|
||||||
func Parallel(fns ...func()) {
|
func Parallel(fns ...func()) {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package fx
|
package fx
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/errorx"
|
import "github.com/zeromicro/go-zero/core/errorx"
|
||||||
|
|
||||||
const defaultRetryTimes = 3
|
const defaultRetryTimes = 3
|
||||||
|
|
||||||
|
|||||||
@@ -4,9 +4,9 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -90,6 +90,8 @@ func Range(source <-chan interface{}) Stream {
|
|||||||
func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
|
func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
|
||||||
for item := range s.source {
|
for item := range s.source {
|
||||||
if !predicate(item) {
|
if !predicate(item) {
|
||||||
|
// make sure the former goroutine not block, and current func returns fast.
|
||||||
|
go drain(s.source)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -103,6 +105,8 @@ func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
|
|||||||
func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
|
func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
|
||||||
for item := range s.source {
|
for item := range s.source {
|
||||||
if predicate(item) {
|
if predicate(item) {
|
||||||
|
// make sure the former goroutine not block, and current func returns fast.
|
||||||
|
go drain(s.source)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -186,8 +190,7 @@ func (s Stream) Distinct(fn KeyFunc) Stream {
|
|||||||
|
|
||||||
// Done waits all upstreaming operations to be done.
|
// Done waits all upstreaming operations to be done.
|
||||||
func (s Stream) Done() {
|
func (s Stream) Done() {
|
||||||
for range s.source {
|
drain(s.source)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter filters the items by the given FilterFunc.
|
// Filter filters the items by the given FilterFunc.
|
||||||
@@ -199,9 +202,22 @@ func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
|
|||||||
}, opts...)
|
}, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First returns the first item, nil if no items.
|
||||||
|
func (s Stream) First() interface{} {
|
||||||
|
for item := range s.source {
|
||||||
|
// make sure the former goroutine not block, and current func returns fast.
|
||||||
|
go drain(s.source)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ForAll handles the streaming elements from the source and no later streams.
|
// ForAll handles the streaming elements from the source and no later streams.
|
||||||
func (s Stream) ForAll(fn ForAllFunc) {
|
func (s Stream) ForAll(fn ForAllFunc) {
|
||||||
fn(s.source)
|
fn(s.source)
|
||||||
|
// avoid goroutine leak on fn not consuming all items.
|
||||||
|
go drain(s.source)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
|
// ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
|
||||||
@@ -246,11 +262,14 @@ func (s Stream) Head(n int64) Stream {
|
|||||||
}
|
}
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
// let successive method go ASAP even we have more items to skip
|
// let successive method go ASAP even we have more items to skip
|
||||||
// why we don't just break the loop, because if breaks,
|
|
||||||
// this former goroutine will block forever, which will cause goroutine leak.
|
|
||||||
close(source)
|
close(source)
|
||||||
|
// why we don't just break the loop, and drain to consume all items.
|
||||||
|
// because if breaks, this former goroutine will block forever,
|
||||||
|
// which will cause goroutine leak.
|
||||||
|
drain(s.source)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// not enough items in s.source, but we need to let successive method to go ASAP.
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
close(source)
|
close(source)
|
||||||
}
|
}
|
||||||
@@ -259,6 +278,13 @@ func (s Stream) Head(n int64) Stream {
|
|||||||
return Range(source)
|
return Range(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Last returns the last item, or nil if no items.
|
||||||
|
func (s Stream) Last() (item interface{}) {
|
||||||
|
for item = range s.source {
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Map converts each item to another corresponding item, which means it's a 1:1 model.
|
// Map converts each item to another corresponding item, which means it's a 1:1 model.
|
||||||
func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
|
func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
|
||||||
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
|
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
|
||||||
@@ -280,6 +306,21 @@ func (s Stream) Merge() Stream {
|
|||||||
return Range(source)
|
return Range(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NoneMatch returns whether all elements of this stream don't match the provided predicate.
|
||||||
|
// May not evaluate the predicate on all elements if not necessary for determining the result.
|
||||||
|
// If the stream is empty then true is returned and the predicate is not evaluated.
|
||||||
|
func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool {
|
||||||
|
for item := range s.source {
|
||||||
|
if predicate(item) {
|
||||||
|
// make sure the former goroutine not block, and current func returns fast.
|
||||||
|
go drain(s.source)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
|
// Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
|
||||||
func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
|
func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
|
||||||
s.Walk(func(item interface{}, pipe chan<- interface{}) {
|
s.Walk(func(item interface{}, pipe chan<- interface{}) {
|
||||||
@@ -411,15 +452,12 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
pool := make(chan lang.PlaceholderType, option.workers)
|
pool := make(chan lang.PlaceholderType, option.workers)
|
||||||
|
|
||||||
for {
|
for item := range s.source {
|
||||||
|
// important, used in another goroutine
|
||||||
|
val := item
|
||||||
pool <- lang.Placeholder
|
pool <- lang.Placeholder
|
||||||
item, ok := <-s.source
|
|
||||||
if !ok {
|
|
||||||
<-pool
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
// better to safely run caller defined method
|
// better to safely run caller defined method
|
||||||
threading.GoSafe(func() {
|
threading.GoSafe(func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -427,7 +465,7 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
|
|||||||
<-pool
|
<-pool
|
||||||
}()
|
}()
|
||||||
|
|
||||||
fn(item, pipe)
|
fn(val, pipe)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -439,22 +477,19 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
|
func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
|
||||||
pipe := make(chan interface{}, defaultWorkers)
|
pipe := make(chan interface{}, option.workers)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for {
|
for item := range s.source {
|
||||||
item, ok := <-s.source
|
// important, used in another goroutine
|
||||||
if !ok {
|
val := item
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
// better to safely run caller defined method
|
// better to safely run caller defined method
|
||||||
threading.GoSafe(func() {
|
threading.GoSafe(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
fn(item, pipe)
|
fn(val, pipe)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -465,14 +500,14 @@ func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
|
|||||||
return Range(pipe)
|
return Range(pipe)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnlimitedWorkers lets the caller to use as many workers as the tasks.
|
// UnlimitedWorkers lets the caller use as many workers as the tasks.
|
||||||
func UnlimitedWorkers() Option {
|
func UnlimitedWorkers() Option {
|
||||||
return func(opts *rxOptions) {
|
return func(opts *rxOptions) {
|
||||||
opts.unlimitedWorkers = true
|
opts.unlimitedWorkers = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithWorkers lets the caller to customize the concurrent workers.
|
// WithWorkers lets the caller customize the concurrent workers.
|
||||||
func WithWorkers(workers int) Option {
|
func WithWorkers(workers int) Option {
|
||||||
return func(opts *rxOptions) {
|
return func(opts *rxOptions) {
|
||||||
if workers < minWorkers {
|
if workers < minWorkers {
|
||||||
@@ -483,6 +518,7 @@ func WithWorkers(workers int) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildOptions returns a rxOptions with given customizations.
|
||||||
func buildOptions(opts ...Option) *rxOptions {
|
func buildOptions(opts ...Option) *rxOptions {
|
||||||
options := newOptions()
|
options := newOptions()
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@@ -492,6 +528,13 @@ func buildOptions(opts ...Option) *rxOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drain drains the given channel.
|
||||||
|
func drain(channel <-chan interface{}) {
|
||||||
|
for range channel {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newOptions returns a default rxOptions.
|
||||||
func newOptions() *rxOptions {
|
func newOptions() *rxOptions {
|
||||||
return &rxOptions{
|
return &rxOptions{
|
||||||
workers: defaultWorkers,
|
workers: defaultWorkers,
|
||||||
|
|||||||
@@ -13,324 +13,494 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBuffer(t *testing.T) {
|
func TestBuffer(t *testing.T) {
|
||||||
const N = 5
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
var count int32
|
const N = 5
|
||||||
var wait sync.WaitGroup
|
var count int32
|
||||||
wait.Add(1)
|
var wait sync.WaitGroup
|
||||||
From(func(source chan<- interface{}) {
|
wait.Add(1)
|
||||||
ticker := time.NewTicker(10 * time.Millisecond)
|
From(func(source chan<- interface{}) {
|
||||||
defer ticker.Stop()
|
ticker := time.NewTicker(10 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
for i := 0; i < 2*N; i++ {
|
for i := 0; i < 2*N; i++ {
|
||||||
select {
|
select {
|
||||||
case source <- i:
|
case source <- i:
|
||||||
atomic.AddInt32(&count, 1)
|
atomic.AddInt32(&count, 1)
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
wait.Done()
|
wait.Done()
|
||||||
return
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}).Buffer(N).ForAll(func(pipe <-chan interface{}) {
|
||||||
}).Buffer(N).ForAll(func(pipe <-chan interface{}) {
|
wait.Wait()
|
||||||
wait.Wait()
|
// why N+1, because take one more to wait for sending into the channel
|
||||||
// why N+1, because take one more to wait for sending into the channel
|
assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
|
||||||
assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBufferNegative(t *testing.T) {
|
func TestBufferNegative(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
var result int
|
||||||
for item := range pipe {
|
Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 10, result)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 10, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCount(t *testing.T) {
|
func TestCount(t *testing.T) {
|
||||||
tests := []struct {
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
name string
|
tests := []struct {
|
||||||
elements []interface{}
|
name string
|
||||||
}{
|
elements []interface{}
|
||||||
{
|
}{
|
||||||
name: "no elements with nil",
|
{
|
||||||
},
|
name: "no elements with nil",
|
||||||
{
|
},
|
||||||
name: "no elements",
|
{
|
||||||
elements: []interface{}{},
|
name: "no elements",
|
||||||
},
|
elements: []interface{}{},
|
||||||
{
|
},
|
||||||
name: "1 element",
|
{
|
||||||
elements: []interface{}{1},
|
name: "1 element",
|
||||||
},
|
elements: []interface{}{1},
|
||||||
{
|
},
|
||||||
name: "multiple elements",
|
{
|
||||||
elements: []interface{}{1, 2, 3},
|
name: "multiple elements",
|
||||||
},
|
elements: []interface{}{1, 2, 3},
|
||||||
}
|
},
|
||||||
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
val := Just(test.elements...).Count()
|
val := Just(test.elements...).Count()
|
||||||
assert.Equal(t, len(test.elements), val)
|
assert.Equal(t, len(test.elements), val)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDone(t *testing.T) {
|
func TestDone(t *testing.T) {
|
||||||
var count int32
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
|
var count int32
|
||||||
time.Sleep(time.Millisecond * 100)
|
Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
|
||||||
atomic.AddInt32(&count, int32(item.(int)))
|
time.Sleep(time.Millisecond * 100)
|
||||||
}).Done()
|
atomic.AddInt32(&count, int32(item.(int)))
|
||||||
assert.Equal(t, int32(6), count)
|
}).Done()
|
||||||
|
assert.Equal(t, int32(6), count)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestJust(t *testing.T) {
|
func TestJust(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
var result int
|
||||||
for item := range pipe {
|
Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 10, result)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 10, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDistinct(t *testing.T) {
|
func TestDistinct(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
|
var result int
|
||||||
return item
|
Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
|
||||||
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
return item
|
||||||
for item := range pipe {
|
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 10, result)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 10, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilter(t *testing.T) {
|
func TestFilter(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
|
var result int
|
||||||
return item.(int)%2 == 0
|
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
|
||||||
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
return item.(int)%2 == 0
|
||||||
for item := range pipe {
|
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 6, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFirst(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
assert.Nil(t, Just().First())
|
||||||
|
assert.Equal(t, "foo", Just("foo").First())
|
||||||
|
assert.Equal(t, "foo", Just("foo", "bar").First())
|
||||||
})
|
})
|
||||||
assert.Equal(t, 6, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestForAll(t *testing.T) {
|
func TestForAll(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
|
var result int
|
||||||
return item.(int)%2 == 0
|
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
|
||||||
}).ForAll(func(pipe <-chan interface{}) {
|
return item.(int)%2 == 0
|
||||||
for item := range pipe {
|
}).ForAll(func(pipe <-chan interface{}) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
assert.Equal(t, 6, result)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 6, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGroup(t *testing.T) {
|
func TestGroup(t *testing.T) {
|
||||||
var groups [][]int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
|
var groups [][]int
|
||||||
v := item.(int)
|
Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
|
||||||
return v / 10
|
v := item.(int)
|
||||||
}).ForEach(func(item interface{}) {
|
return v / 10
|
||||||
v := item.([]interface{})
|
}).ForEach(func(item interface{}) {
|
||||||
var group []int
|
v := item.([]interface{})
|
||||||
for _, each := range v {
|
var group []int
|
||||||
group = append(group, each.(int))
|
for _, each := range v {
|
||||||
}
|
group = append(group, each.(int))
|
||||||
groups = append(groups, group)
|
}
|
||||||
})
|
groups = append(groups, group)
|
||||||
|
})
|
||||||
|
|
||||||
assert.Equal(t, 2, len(groups))
|
assert.Equal(t, 2, len(groups))
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
assert.Equal(t, 2, len(group))
|
assert.Equal(t, 2, len(group))
|
||||||
assert.True(t, group[0]/10 == group[1]/10)
|
assert.True(t, group[0]/10 == group[1]/10)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHead(t *testing.T) {
|
func TestHead(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
var result int
|
||||||
for item := range pipe {
|
Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 3, result)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 3, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadZero(t *testing.T) {
|
func TestHeadZero(t *testing.T) {
|
||||||
assert.Panics(t, func() {
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
assert.Panics(t, func() {
|
||||||
return nil, nil
|
Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadMore(t *testing.T) {
|
func TestHeadMore(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
var result int
|
||||||
for item := range pipe {
|
Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 10, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLast(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
goroutines := runtime.NumGoroutine()
|
||||||
|
assert.Nil(t, Just().Last())
|
||||||
|
assert.Equal(t, "foo", Just("foo").Last())
|
||||||
|
assert.Equal(t, "bar", Just("foo", "bar").Last())
|
||||||
|
// let scheduler schedule first
|
||||||
|
runtime.Gosched()
|
||||||
|
assert.Equal(t, goroutines, runtime.NumGoroutine())
|
||||||
})
|
})
|
||||||
assert.Equal(t, 10, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMap(t *testing.T) {
|
func TestMap(t *testing.T) {
|
||||||
log.SetOutput(ioutil.Discard)
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
log.SetOutput(ioutil.Discard)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
mapper MapFunc
|
mapper MapFunc
|
||||||
expect int
|
expect int
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
mapper: func(item interface{}) interface{} {
|
mapper: func(item interface{}) interface{} {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
return v * v
|
return v * v
|
||||||
|
},
|
||||||
|
expect: 30,
|
||||||
},
|
},
|
||||||
expect: 30,
|
{
|
||||||
},
|
mapper: func(item interface{}) interface{} {
|
||||||
{
|
v := item.(int)
|
||||||
mapper: func(item interface{}) interface{} {
|
if v%2 == 0 {
|
||||||
v := item.(int)
|
return 0
|
||||||
if v%2 == 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return v * v
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}) interface{} {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
panic(v)
|
|
||||||
}
|
|
||||||
return v * v
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Map(...) works even WithWorkers(0)
|
|
||||||
for i, test := range tests {
|
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
|
||||||
var result int
|
|
||||||
var workers int
|
|
||||||
if i%2 == 0 {
|
|
||||||
workers = 0
|
|
||||||
} else {
|
|
||||||
workers = runtime.NumCPU()
|
|
||||||
}
|
|
||||||
From(func(source chan<- interface{}) {
|
|
||||||
for i := 1; i < 5; i++ {
|
|
||||||
source <- i
|
|
||||||
}
|
|
||||||
}).Map(test.mapper, WithWorkers(workers)).Reduce(
|
|
||||||
func(pipe <-chan interface{}) (interface{}, error) {
|
|
||||||
for item := range pipe {
|
|
||||||
result += item.(int)
|
|
||||||
}
|
}
|
||||||
return result, nil
|
return v * v
|
||||||
})
|
},
|
||||||
|
expect: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
mapper: func(item interface{}) interface{} {
|
||||||
|
v := item.(int)
|
||||||
|
if v%2 == 0 {
|
||||||
|
panic(v)
|
||||||
|
}
|
||||||
|
return v * v
|
||||||
|
},
|
||||||
|
expect: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
assert.Equal(t, test.expect, result)
|
// Map(...) works even WithWorkers(0)
|
||||||
})
|
for i, test := range tests {
|
||||||
}
|
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||||
|
var result int
|
||||||
|
var workers int
|
||||||
|
if i%2 == 0 {
|
||||||
|
workers = 0
|
||||||
|
} else {
|
||||||
|
workers = runtime.NumCPU()
|
||||||
|
}
|
||||||
|
From(func(source chan<- interface{}) {
|
||||||
|
for i := 1; i < 5; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}).Map(test.mapper, WithWorkers(workers)).Reduce(
|
||||||
|
func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, test.expect, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMerge(t *testing.T) {
|
func TestMerge(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
|
Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
|
||||||
|
assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParallelJust(t *testing.T) {
|
func TestParallelJust(t *testing.T) {
|
||||||
var count int32
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3).Parallel(func(item interface{}) {
|
var count int32
|
||||||
time.Sleep(time.Millisecond * 100)
|
Just(1, 2, 3).Parallel(func(item interface{}) {
|
||||||
atomic.AddInt32(&count, int32(item.(int)))
|
time.Sleep(time.Millisecond * 100)
|
||||||
}, UnlimitedWorkers())
|
atomic.AddInt32(&count, int32(item.(int)))
|
||||||
assert.Equal(t, int32(6), count)
|
}, UnlimitedWorkers())
|
||||||
|
assert.Equal(t, int32(6), count)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReverse(t *testing.T) {
|
func TestReverse(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
|
Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
|
||||||
|
assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSort(t *testing.T) {
|
func TestSort(t *testing.T) {
|
||||||
var prev int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
|
var prev int
|
||||||
return a.(int) < b.(int)
|
Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
|
||||||
}).ForEach(func(item interface{}) {
|
return a.(int) < b.(int)
|
||||||
next := item.(int)
|
}).ForEach(func(item interface{}) {
|
||||||
assert.True(t, prev < next)
|
next := item.(int)
|
||||||
prev = next
|
assert.True(t, prev < next)
|
||||||
|
prev = next
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSplit(t *testing.T) {
|
func TestSplit(t *testing.T) {
|
||||||
assert.Panics(t, func() {
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
|
assert.Panics(t, func() {
|
||||||
|
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
|
||||||
|
})
|
||||||
|
var chunks [][]interface{}
|
||||||
|
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
|
||||||
|
chunk := item.([]interface{})
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
})
|
||||||
|
assert.EqualValues(t, [][]interface{}{
|
||||||
|
{1, 2, 3, 4},
|
||||||
|
{5, 6, 7, 8},
|
||||||
|
{9, 10},
|
||||||
|
}, chunks)
|
||||||
})
|
})
|
||||||
var chunks [][]interface{}
|
|
||||||
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
|
|
||||||
chunk := item.([]interface{})
|
|
||||||
chunks = append(chunks, chunk)
|
|
||||||
})
|
|
||||||
assert.EqualValues(t, [][]interface{}{
|
|
||||||
{1, 2, 3, 4},
|
|
||||||
{5, 6, 7, 8},
|
|
||||||
{9, 10},
|
|
||||||
}, chunks)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTail(t *testing.T) {
|
func TestTail(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
var result int
|
||||||
for item := range pipe {
|
Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
result += item.(int)
|
for item := range pipe {
|
||||||
}
|
result += item.(int)
|
||||||
return result, nil
|
}
|
||||||
|
return result, nil
|
||||||
|
})
|
||||||
|
assert.Equal(t, 7, result)
|
||||||
})
|
})
|
||||||
assert.Equal(t, 7, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTailZero(t *testing.T) {
|
func TestTailZero(t *testing.T) {
|
||||||
assert.Panics(t, func() {
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
assert.Panics(t, func() {
|
||||||
return nil, nil
|
Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWalk(t *testing.T) {
|
func TestWalk(t *testing.T) {
|
||||||
var result int
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
|
var result int
|
||||||
if item.(int)%2 != 0 {
|
Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
|
||||||
pipe <- item
|
if item.(int)%2 != 0 {
|
||||||
}
|
pipe <- item
|
||||||
}, UnlimitedWorkers()).ForEach(func(item interface{}) {
|
}
|
||||||
result += item.(int)
|
}, UnlimitedWorkers()).ForEach(func(item interface{}) {
|
||||||
|
result += item.(int)
|
||||||
|
})
|
||||||
|
assert.Equal(t, 9, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream_AnyMach(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
||||||
|
return item.(int) == 4
|
||||||
|
}))
|
||||||
|
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
||||||
|
return item.(int) == 0
|
||||||
|
}))
|
||||||
|
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
||||||
|
return item.(int) == 2
|
||||||
|
}))
|
||||||
|
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
||||||
|
return item.(int) == 2
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream_AllMach(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
assetEqual(
|
||||||
|
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
|
||||||
|
return true
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
assetEqual(
|
||||||
|
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
|
||||||
|
return false
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
assetEqual(
|
||||||
|
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
|
||||||
|
return item.(int) == 1
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream_NoneMatch(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
assetEqual(
|
||||||
|
t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
|
||||||
|
return false
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
assetEqual(
|
||||||
|
t, false, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
|
||||||
|
return true
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
assetEqual(
|
||||||
|
t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
|
||||||
|
return item.(int) == 4
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcat(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
a1 := []interface{}{1, 2, 3}
|
||||||
|
a2 := []interface{}{4, 5, 6}
|
||||||
|
s1 := Just(a1...)
|
||||||
|
s2 := Just(a2...)
|
||||||
|
stream := Concat(s1, s2)
|
||||||
|
var items []interface{}
|
||||||
|
for item := range stream.source {
|
||||||
|
items = append(items, item)
|
||||||
|
}
|
||||||
|
sort.Slice(items, func(i, j int) bool {
|
||||||
|
return items[i].(int) < items[j].(int)
|
||||||
|
})
|
||||||
|
ints := make([]interface{}, 0)
|
||||||
|
ints = append(ints, a1...)
|
||||||
|
ints = append(ints, a2...)
|
||||||
|
assetEqual(t, ints, items)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream_Skip(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
|
||||||
|
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
|
||||||
|
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
|
||||||
|
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
|
||||||
|
assert.Panics(t, func() {
|
||||||
|
Just(1, 2, 3, 4).Skip(-1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream_Concat(t *testing.T) {
|
||||||
|
runCheckedTest(t, func(t *testing.T) {
|
||||||
|
stream := Just(1).Concat(Just(2), Just(3))
|
||||||
|
var items []interface{}
|
||||||
|
for item := range stream.source {
|
||||||
|
items = append(items, item)
|
||||||
|
}
|
||||||
|
sort.Slice(items, func(i, j int) bool {
|
||||||
|
return items[i].(int) < items[j].(int)
|
||||||
|
})
|
||||||
|
assetEqual(t, []interface{}{1, 2, 3}, items)
|
||||||
|
|
||||||
|
just := Just(1)
|
||||||
|
equal(t, just.Concat(just), []interface{}{1})
|
||||||
})
|
})
|
||||||
assert.Equal(t, 9, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkParallelMapReduce(b *testing.B) {
|
func BenchmarkParallelMapReduce(b *testing.B) {
|
||||||
@@ -377,6 +547,12 @@ func BenchmarkMapReduce(b *testing.B) {
|
|||||||
}).Map(mapper).Reduce(reducer)
|
}).Map(mapper).Reduce(reducer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assetEqual(t *testing.T, except, data interface{}) {
|
||||||
|
if !reflect.DeepEqual(except, data) {
|
||||||
|
t.Errorf(" %v, want %v", data, except)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func equal(t *testing.T, stream Stream, data []interface{}) {
|
func equal(t *testing.T, stream Stream, data []interface{}) {
|
||||||
items := make([]interface{}, 0)
|
items := make([]interface{}, 0)
|
||||||
for item := range stream.source {
|
for item := range stream.source {
|
||||||
@@ -387,85 +563,7 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func assetEqual(t *testing.T, except, data interface{}) {
|
func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
|
||||||
if !reflect.DeepEqual(except, data) {
|
defer goleak.VerifyNone(t)
|
||||||
t.Errorf(" %v, want %v", data, except)
|
fn(t)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStream_AnyMach(t *testing.T) {
|
|
||||||
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
|
||||||
return item.(int) == 4
|
|
||||||
}))
|
|
||||||
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
|
||||||
return item.(int) == 0
|
|
||||||
}))
|
|
||||||
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
|
||||||
return item.(int) == 2
|
|
||||||
}))
|
|
||||||
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
|
|
||||||
return item.(int) == 2
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStream_AllMach(t *testing.T) {
|
|
||||||
assetEqual(
|
|
||||||
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
|
|
||||||
return true
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
assetEqual(
|
|
||||||
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
|
|
||||||
return false
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
assetEqual(
|
|
||||||
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
|
|
||||||
return item.(int) == 1
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConcat(t *testing.T) {
|
|
||||||
a1 := []interface{}{1, 2, 3}
|
|
||||||
a2 := []interface{}{4, 5, 6}
|
|
||||||
s1 := Just(a1...)
|
|
||||||
s2 := Just(a2...)
|
|
||||||
stream := Concat(s1, s2)
|
|
||||||
var items []interface{}
|
|
||||||
for item := range stream.source {
|
|
||||||
items = append(items, item)
|
|
||||||
}
|
|
||||||
sort.Slice(items, func(i, j int) bool {
|
|
||||||
return items[i].(int) < items[j].(int)
|
|
||||||
})
|
|
||||||
ints := make([]interface{}, 0)
|
|
||||||
ints = append(ints, a1...)
|
|
||||||
ints = append(ints, a2...)
|
|
||||||
assetEqual(t, ints, items)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStream_Skip(t *testing.T) {
|
|
||||||
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
|
|
||||||
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
|
|
||||||
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
|
|
||||||
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
|
|
||||||
assert.Panics(t, func() {
|
|
||||||
Just(1, 2, 3, 4).Skip(-1)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStream_Concat(t *testing.T) {
|
|
||||||
stream := Just(1).Concat(Just(2), Just(3))
|
|
||||||
var items []interface{}
|
|
||||||
for item := range stream.source {
|
|
||||||
items = append(items, item)
|
|
||||||
}
|
|
||||||
sort.Slice(items, func(i, j int) bool {
|
|
||||||
return items[i].(int) < items[j].(int)
|
|
||||||
})
|
|
||||||
assetEqual(t, []interface{}{1, 2, 3}, items)
|
|
||||||
|
|
||||||
just := Just(1)
|
|
||||||
equal(t, just.Concat(just), []interface{}{1})
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/mapping"
|
"github.com/zeromicro/go-zero/core/mapping"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -9,8 +9,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReadText(t *testing.T) {
|
func TestReadText(t *testing.T) {
|
||||||
|
|||||||
@@ -51,5 +51,5 @@ func unmarshalUseNumber(decoder *json.Decoder, v interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func formatError(v string, err error) error {
|
func formatError(v string, err error) error {
|
||||||
return fmt.Errorf("string: `%s`, error: `%s`", v, err.Error())
|
return fmt.Errorf("string: `%s`, error: `%w`", v, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ package lang
|
|||||||
var Placeholder PlaceholderType
|
var Placeholder PlaceholderType
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// GenericType can be used to hold any type.
|
// AnyType can be used to hold any type.
|
||||||
GenericType = interface{}
|
AnyType = interface{}
|
||||||
// PlaceholderType represents a placeholder type.
|
// PlaceholderType represents a placeholder type.
|
||||||
PlaceholderType = struct{}
|
PlaceholderType = struct{}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -5,26 +5,23 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
|
||||||
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
|
const periodScript = `local limit = tonumber(ARGV[1])
|
||||||
periodScript = `local limit = tonumber(ARGV[1])
|
|
||||||
local window = tonumber(ARGV[2])
|
local window = tonumber(ARGV[2])
|
||||||
local current = redis.call("INCRBY", KEYS[1], 1)
|
local current = redis.call("INCRBY", KEYS[1], 1)
|
||||||
if current == 1 then
|
if current == 1 then
|
||||||
redis.call("expire", KEYS[1], window)
|
redis.call("expire", KEYS[1], window)
|
||||||
return 1
|
end
|
||||||
elseif current < limit then
|
if current < limit then
|
||||||
return 1
|
return 1
|
||||||
elseif current == limit then
|
elseif current == limit then
|
||||||
return 2
|
return 2
|
||||||
else
|
else
|
||||||
return 0
|
return 0
|
||||||
end`
|
end`
|
||||||
zoneDiff = 3600 * 8 // GMT+8 for our services
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Unknown means not initialized state.
|
// Unknown means not initialized state.
|
||||||
@@ -104,7 +101,9 @@ func (h *PeriodLimit) Take(key string) (int, error) {
|
|||||||
|
|
||||||
func (h *PeriodLimit) calcExpireSeconds() int {
|
func (h *PeriodLimit) calcExpireSeconds() int {
|
||||||
if h.align {
|
if h.align {
|
||||||
unix := time.Now().Unix() + zoneDiff
|
now := time.Now()
|
||||||
|
_, offset := now.Zone()
|
||||||
|
unix := now.Unix() + int64(offset)
|
||||||
return h.period - int(unix%int64(h.period))
|
return h.period - int(unix%int64(h.period))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +111,8 @@ func (h *PeriodLimit) calcExpireSeconds() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Align returns a func to customize a PeriodLimit with alignment.
|
// Align returns a func to customize a PeriodLimit with alignment.
|
||||||
|
// For example, if we want to limit end users with 5 sms verification messages every day,
|
||||||
|
// we need to align with the local timezone and the start of the day.
|
||||||
func Align() PeriodOption {
|
func Align() PeriodOption {
|
||||||
return func(l *PeriodLimit) {
|
return func(l *PeriodLimit) {
|
||||||
l.align = true
|
l.align = true
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
|
|
||||||
"github.com/alicebob/miniredis/v2"
|
"github.com/alicebob/miniredis/v2"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPeriodLimit_Take(t *testing.T) {
|
func TestPeriodLimit_Take(t *testing.T) {
|
||||||
@@ -23,10 +23,9 @@ func TestPeriodLimit_RedisUnavailable(t *testing.T) {
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
seconds = 1
|
seconds = 1
|
||||||
total = 100
|
|
||||||
quota = 5
|
quota = 5
|
||||||
)
|
)
|
||||||
l := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")
|
l := NewPeriodLimit(seconds, quota, redis.New(s.Addr()), "periodlimit")
|
||||||
s.Close()
|
s.Close()
|
||||||
val, err := l.Take("first")
|
val, err := l.Take("first")
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
@@ -66,3 +65,13 @@ func testPeriodLimit(t *testing.T, opts ...PeriodOption) {
|
|||||||
assert.Equal(t, 1, hitQuota)
|
assert.Equal(t, 1, hitQuota)
|
||||||
assert.Equal(t, total-quota, overQuota)
|
assert.Equal(t, total-quota, overQuota)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestQuotaFull(t *testing.T) {
|
||||||
|
s, err := miniredis.Run()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
l := NewPeriodLimit(1, 1, redis.New(s.Addr()), "periodlimit")
|
||||||
|
val, err := l.Take("first")
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, HitQuota, val)
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
xrate "golang.org/x/time/rate"
|
xrate "golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AllowN reports whether n events may happen at time now.
|
// AllowN reports whether n events may happen at time now.
|
||||||
// Use this method if you intend to drop / skip events that exceed the rate rate.
|
// Use this method if you intend to drop / skip events that exceed the rate.
|
||||||
// Otherwise use Reserve or Wait.
|
// Otherwise, use Reserve or Wait.
|
||||||
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
|
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
|
||||||
return lim.reserveN(now, n)
|
return lim.reserveN(now, n)
|
||||||
}
|
}
|
||||||
@@ -112,7 +112,8 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
|
|||||||
// Lua boolean false -> r Nil bulk reply
|
// Lua boolean false -> r Nil bulk reply
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return false
|
return false
|
||||||
} else if err != nil {
|
}
|
||||||
|
if err != nil {
|
||||||
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
|
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
|
||||||
lim.startMonitor()
|
lim.startMonitor()
|
||||||
return lim.rescueLimiter.AllowN(now, n)
|
return lim.rescueLimiter.AllowN(now, n)
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ import (
|
|||||||
|
|
||||||
"github.com/alicebob/miniredis/v2"
|
"github.com/alicebob/miniredis/v2"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -7,11 +7,11 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,11 +8,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package load
|
|||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A ShedderGroup is a manager to manage key based shedders.
|
// A ShedderGroup is a manager to manage key based shedders.
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -3,10 +3,11 @@ package logx
|
|||||||
// A LogConf is a logging config.
|
// A LogConf is a logging config.
|
||||||
type LogConf struct {
|
type LogConf struct {
|
||||||
ServiceName string `json:",optional"`
|
ServiceName string `json:",optional"`
|
||||||
Mode string `json:",default=console,options=console|file|volume"`
|
Mode string `json:",default=console,options=[console,file,volume]"`
|
||||||
|
Encoding string `json:",default=json,options=[json,plain]"`
|
||||||
TimeFormat string `json:",optional"`
|
TimeFormat string `json:",optional"`
|
||||||
Path string `json:",default=logs"`
|
Path string `json:",default=logs"`
|
||||||
Level string `json:",default=info,options=info|error|severe"`
|
Level string `json:",default=info,options=[info,error,severe]"`
|
||||||
Compress bool `json:",optional"`
|
Compress bool `json:",optional"`
|
||||||
KeepDays int `json:",optional"`
|
KeepDays int `json:",optional"`
|
||||||
StackCooldownMillis int `json:",default=100"`
|
StackCooldownMillis int `json:",default=100"`
|
||||||
|
|||||||
@@ -3,9 +3,10 @@ package logx
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const durationCallerDepth = 3
|
const durationCallerDepth = 3
|
||||||
@@ -79,10 +80,15 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
|
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
|
||||||
outputJson(writer, &durationLogger{
|
switch atomic.LoadUint32(&encoding) {
|
||||||
Timestamp: getTimestamp(),
|
case plainEncodingType:
|
||||||
Level: level,
|
writePlainAny(writer, level, val, l.Duration)
|
||||||
Content: val,
|
default:
|
||||||
Duration: l.Duration,
|
outputJson(writer, &durationLogger{
|
||||||
})
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Content: val,
|
||||||
|
Duration: l.Duration,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package logx
|
|||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -37,6 +38,19 @@ func TestWithDurationInfo(t *testing.T) {
|
|||||||
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
|
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWithDurationInfoConsole(t *testing.T) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var builder strings.Builder
|
||||||
|
log.SetOutput(&builder)
|
||||||
|
WithDuration(time.Second).Info("foo")
|
||||||
|
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
|
||||||
|
}
|
||||||
|
|
||||||
func TestWithDurationInfof(t *testing.T) {
|
func TestWithDurationInfof(t *testing.T) {
|
||||||
var builder strings.Builder
|
var builder strings.Builder
|
||||||
log.SetOutput(&builder)
|
log.SetOutput(&builder)
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
type limitedExecutor struct {
|
type limitedExecutor struct {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLimitedExecutor_logOrDiscard(t *testing.T) {
|
func TestLimitedExecutor_logOrDiscard(t *testing.T) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package logx
|
package logx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -17,9 +18,9 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/iox"
|
"github.com/zeromicro/go-zero/core/iox"
|
||||||
"github.com/tal-tech/go-zero/core/sysx"
|
"github.com/zeromicro/go-zero/core/sysx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -31,6 +32,15 @@ const (
|
|||||||
SevereLevel
|
SevereLevel
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
jsonEncodingType = iota
|
||||||
|
plainEncodingType
|
||||||
|
|
||||||
|
jsonEncoding = "json"
|
||||||
|
plainEncoding = "plain"
|
||||||
|
plainEncodingSep = '\t'
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
accessFilename = "access.log"
|
accessFilename = "access.log"
|
||||||
errorFilename = "error.log"
|
errorFilename = "error.log"
|
||||||
@@ -62,9 +72,10 @@ var (
|
|||||||
// ErrLogServiceNameNotSet is an error that indicates that the service name is not set.
|
// ErrLogServiceNameNotSet is an error that indicates that the service name is not set.
|
||||||
ErrLogServiceNameNotSet = errors.New("log service name must be set")
|
ErrLogServiceNameNotSet = errors.New("log service name must be set")
|
||||||
|
|
||||||
timeFormat = "2006-01-02T15:04:05.000Z07"
|
timeFormat = "2006-01-02T15:04:05.000Z07:00"
|
||||||
writeConsole bool
|
writeConsole bool
|
||||||
logLevel uint32
|
logLevel uint32
|
||||||
|
encoding uint32 = jsonEncodingType
|
||||||
// use uint32 for atomic operations
|
// use uint32 for atomic operations
|
||||||
disableStat uint32
|
disableStat uint32
|
||||||
infoLog io.WriteCloser
|
infoLog io.WriteCloser
|
||||||
@@ -124,6 +135,12 @@ func SetUp(c LogConf) error {
|
|||||||
if len(c.TimeFormat) > 0 {
|
if len(c.TimeFormat) > 0 {
|
||||||
timeFormat = c.TimeFormat
|
timeFormat = c.TimeFormat
|
||||||
}
|
}
|
||||||
|
switch c.Encoding {
|
||||||
|
case plainEncoding:
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
default:
|
||||||
|
atomic.StoreUint32(&encoding, jsonEncodingType)
|
||||||
|
}
|
||||||
|
|
||||||
switch c.Mode {
|
switch c.Mode {
|
||||||
case consoleMode:
|
case consoleMode:
|
||||||
@@ -407,21 +424,31 @@ func infoTextSync(msg string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func outputAny(writer io.Writer, level string, val interface{}) {
|
func outputAny(writer io.Writer, level string, val interface{}) {
|
||||||
info := logEntry{
|
switch atomic.LoadUint32(&encoding) {
|
||||||
Timestamp: getTimestamp(),
|
case plainEncodingType:
|
||||||
Level: level,
|
writePlainAny(writer, level, val)
|
||||||
Content: val,
|
default:
|
||||||
|
info := logEntry{
|
||||||
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Content: val,
|
||||||
|
}
|
||||||
|
outputJson(writer, info)
|
||||||
}
|
}
|
||||||
outputJson(writer, info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func outputText(writer io.Writer, level, msg string) {
|
func outputText(writer io.Writer, level, msg string) {
|
||||||
info := logEntry{
|
switch atomic.LoadUint32(&encoding) {
|
||||||
Timestamp: getTimestamp(),
|
case plainEncodingType:
|
||||||
Level: level,
|
writePlainText(writer, level, msg)
|
||||||
Content: msg,
|
default:
|
||||||
|
info := logEntry{
|
||||||
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Content: msg,
|
||||||
|
}
|
||||||
|
outputJson(writer, info)
|
||||||
}
|
}
|
||||||
outputJson(writer, info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func outputError(writer io.Writer, msg string, callDepth int) {
|
func outputError(writer io.Writer, msg string, callDepth int) {
|
||||||
@@ -565,6 +592,62 @@ func statSync(msg string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
|
||||||
|
switch v := val.(type) {
|
||||||
|
case string:
|
||||||
|
writePlainText(writer, level, v, fields...)
|
||||||
|
case error:
|
||||||
|
writePlainText(writer, level, v.Error(), fields...)
|
||||||
|
case fmt.Stringer:
|
||||||
|
writePlainText(writer, level, v.String(), fields...)
|
||||||
|
default:
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(getTimestamp())
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(level)
|
||||||
|
for _, item := range fields {
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(item)
|
||||||
|
}
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
if err := json.NewEncoder(&buf).Encode(val); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
|
||||||
|
log.Println(buf.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := writer.Write(buf.Bytes()); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(getTimestamp())
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(level)
|
||||||
|
for _, item := range fields {
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(item)
|
||||||
|
}
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(msg)
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
|
||||||
|
log.Println(buf.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := writer.Write(buf.Bytes()); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type logWriter struct {
|
type logWriter struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,6 +141,78 @@ func TestStructedLogInfov(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAny(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(v)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(fmt.Sprint(v...))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(errors.New(fmt.Sprint(v...)))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(ValStringer{
|
||||||
|
val: fmt.Sprint(v...),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleText(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, plainEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Info(fmt.Sprint(v...))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestStructedLogSlow(t *testing.T) {
|
func TestStructedLogSlow(t *testing.T) {
|
||||||
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
|
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
|
||||||
slowLog = writer
|
slowLog = writer
|
||||||
@@ -432,6 +504,17 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
|
|||||||
assert.True(t, strings.Contains(val, message))
|
assert.True(t, strings.Contains(val, message))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
|
||||||
|
write func(...interface{})) {
|
||||||
|
const message = "hello there"
|
||||||
|
writer := new(mockWriter)
|
||||||
|
setup(writer)
|
||||||
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
write(message)
|
||||||
|
println(writer.String())
|
||||||
|
assert.True(t, strings.Contains(writer.String(), message))
|
||||||
|
}
|
||||||
|
|
||||||
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
||||||
SetUp(LogConf{
|
SetUp(LogConf{
|
||||||
Mode: mode,
|
Mode: mode,
|
||||||
@@ -456,3 +539,11 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
|||||||
ErrorStackf(message)
|
ErrorStackf(message)
|
||||||
assert.Equal(t, 0, writer.builder.Len())
|
assert.Equal(t, 0, writer.builder.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ValStringer struct {
|
||||||
|
val string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ValStringer) String() string {
|
||||||
|
return v.val
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,9 +13,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDailyRotateRuleMarkRotated(t *testing.T) {
|
func TestDailyRotateRuleMarkRotated(t *testing.T) {
|
||||||
|
|||||||
@@ -29,9 +29,9 @@ func TestRedirector(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func captureOutput(f func()) string {
|
func captureOutput(f func()) string {
|
||||||
atomic.StoreUint32(&initialized, 1)
|
|
||||||
writer := new(mockWriter)
|
writer := new(mockWriter)
|
||||||
infoLog = writer
|
infoLog = writer
|
||||||
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
|
||||||
prevLevel := atomic.LoadUint32(&logLevel)
|
prevLevel := atomic.LoadUint32(&logLevel)
|
||||||
SetLevel(InfoLevel)
|
SetLevel(InfoLevel)
|
||||||
@@ -44,5 +44,9 @@ func captureOutput(f func()) string {
|
|||||||
func getContent(jsonStr string) string {
|
func getContent(jsonStr string) string {
|
||||||
var entry logEntry
|
var entry logEntry
|
||||||
json.Unmarshal([]byte(jsonStr), &entry)
|
json.Unmarshal([]byte(jsonStr), &entry)
|
||||||
return entry.Content.(string)
|
val, ok := entry.Content.(string)
|
||||||
|
if ok {
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
return ""
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,9 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -77,16 +78,24 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
|
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
|
||||||
outputJson(writer, &traceLogger{
|
traceID := traceIdFromContext(l.ctx)
|
||||||
logEntry: logEntry{
|
spanID := spanIdFromContext(l.ctx)
|
||||||
Timestamp: getTimestamp(),
|
|
||||||
Level: level,
|
switch atomic.LoadUint32(&encoding) {
|
||||||
Duration: l.Duration,
|
case plainEncodingType:
|
||||||
Content: val,
|
writePlainAny(writer, level, val, l.Duration, traceID, spanID)
|
||||||
},
|
default:
|
||||||
Trace: traceIdFromContext(l.ctx),
|
outputJson(writer, &traceLogger{
|
||||||
Span: spanIdFromContext(l.ctx),
|
logEntry: logEntry{
|
||||||
})
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Duration: l.Duration,
|
||||||
|
Content: val,
|
||||||
|
},
|
||||||
|
Trace: traceID,
|
||||||
|
Span: spanID,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContext sets ctx to log, for keeping tracing information.
|
// WithContext sets ctx to log, for keeping tracing information.
|
||||||
|
|||||||
@@ -82,6 +82,37 @@ func TestTraceInfo(t *testing.T) {
|
|||||||
assert.True(t, strings.Contains(buf.String(), spanKey))
|
assert.True(t, strings.Contains(buf.String(), spanKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTraceInfoConsole(t *testing.T) {
|
||||||
|
old := atomic.LoadUint32(&encoding)
|
||||||
|
atomic.StoreUint32(&encoding, jsonEncodingType)
|
||||||
|
defer func() {
|
||||||
|
atomic.StoreUint32(&encoding, old)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var buf mockWriter
|
||||||
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
infoLog = newLogWriter(log.New(&buf, "", flags))
|
||||||
|
otp := otel.GetTracerProvider()
|
||||||
|
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
|
||||||
|
otel.SetTracerProvider(tp)
|
||||||
|
defer otel.SetTracerProvider(otp)
|
||||||
|
|
||||||
|
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
|
||||||
|
l := WithContext(ctx).(*traceLogger)
|
||||||
|
SetLevel(InfoLevel)
|
||||||
|
l.WithDuration(time.Second).Info(testlog)
|
||||||
|
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||||
|
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||||
|
buf.Reset()
|
||||||
|
l.WithDuration(time.Second).Infof(testlog)
|
||||||
|
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||||
|
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||||
|
buf.Reset()
|
||||||
|
l.WithDuration(time.Second).Infov(testlog)
|
||||||
|
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||||
|
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||||
|
}
|
||||||
|
|
||||||
func TestTraceSlow(t *testing.T) {
|
func TestTraceSlow(t *testing.T) {
|
||||||
var buf mockWriter
|
var buf mockWriter
|
||||||
atomic.StoreUint32(&initialized, 1)
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package mapping
|
|||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/jsonx"
|
"github.com/zeromicro/go-zero/core/jsonx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const jsonTagKey = "json"
|
const jsonTagKey = "json"
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/jsonx"
|
"github.com/zeromicro/go-zero/core/jsonx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -448,7 +448,15 @@ func (u *Unmarshaler) fillSlice(fieldType reflect.Type, value reflect.Value, map
|
|||||||
dereffedBaseType := Deref(baseType)
|
dereffedBaseType := Deref(baseType)
|
||||||
dereffedBaseKind := dereffedBaseType.Kind()
|
dereffedBaseKind := dereffedBaseType.Kind()
|
||||||
refValue := reflect.ValueOf(mapValue)
|
refValue := reflect.ValueOf(mapValue)
|
||||||
|
if refValue.IsNil() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
conv := reflect.MakeSlice(reflect.SliceOf(baseType), refValue.Len(), refValue.Cap())
|
conv := reflect.MakeSlice(reflect.SliceOf(baseType), refValue.Len(), refValue.Cap())
|
||||||
|
if refValue.Len() == 0 {
|
||||||
|
value.Set(conv)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var valid bool
|
var valid bool
|
||||||
for i := 0; i < refValue.Len(); i++ {
|
for i := 0; i < refValue.Len(); i++ {
|
||||||
@@ -742,7 +750,9 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
|
|||||||
if len(keys) == 1 {
|
if len(keys) == 1 {
|
||||||
v, ok := m.Value(keys[0])
|
v, ok := m.Value(keys[0])
|
||||||
return v, ok
|
return v, ok
|
||||||
} else if len(keys) > 1 {
|
}
|
||||||
|
|
||||||
|
if len(keys) > 1 {
|
||||||
if v, ok := m.Value(keys[0]); ok {
|
if v, ok := m.Value(keys[0]); ok {
|
||||||
if nextm, ok := v.(map[string]interface{}); ok {
|
if nextm, ok := v.(map[string]interface{}); ok {
|
||||||
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])
|
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// because json.Number doesn't support strconv.ParseUint(...),
|
// because json.Number doesn't support strconv.ParseUint(...),
|
||||||
@@ -198,6 +198,49 @@ func TestUnmarshalIntWithDefault(t *testing.T) {
|
|||||||
assert.Equal(t, 1, in.Int)
|
assert.Equal(t, 1, in.Int)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUnmarshalBoolSliceRequired(t *testing.T) {
|
||||||
|
type inner struct {
|
||||||
|
Bools []bool `key:"bools"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var in inner
|
||||||
|
assert.NotNil(t, UnmarshalKey(map[string]interface{}{}, &in))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnmarshalBoolSliceNil(t *testing.T) {
|
||||||
|
type inner struct {
|
||||||
|
Bools []bool `key:"bools,optional"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var in inner
|
||||||
|
assert.Nil(t, UnmarshalKey(map[string]interface{}{}, &in))
|
||||||
|
assert.Nil(t, in.Bools)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnmarshalBoolSliceNilExplicit(t *testing.T) {
|
||||||
|
type inner struct {
|
||||||
|
Bools []bool `key:"bools,optional"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var in inner
|
||||||
|
assert.Nil(t, UnmarshalKey(map[string]interface{}{
|
||||||
|
"bools": nil,
|
||||||
|
}, &in))
|
||||||
|
assert.Nil(t, in.Bools)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnmarshalBoolSliceEmpty(t *testing.T) {
|
||||||
|
type inner struct {
|
||||||
|
Bools []bool `key:"bools,optional"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var in inner
|
||||||
|
assert.Nil(t, UnmarshalKey(map[string]interface{}{
|
||||||
|
"bools": []bool{},
|
||||||
|
}, &in))
|
||||||
|
assert.Empty(t, in.Bools)
|
||||||
|
}
|
||||||
|
|
||||||
func TestUnmarshalBoolSliceWithDefault(t *testing.T) {
|
func TestUnmarshalBoolSliceWithDefault(t *testing.T) {
|
||||||
type inner struct {
|
type inner struct {
|
||||||
Bools []bool `key:"bools,default=[true,false]"`
|
Bools []bool `key:"bools,default=[true,false]"`
|
||||||
@@ -330,28 +373,34 @@ func TestUnmarshalFloat(t *testing.T) {
|
|||||||
|
|
||||||
func TestUnmarshalInt64Slice(t *testing.T) {
|
func TestUnmarshalInt64Slice(t *testing.T) {
|
||||||
var v struct {
|
var v struct {
|
||||||
Ages []int64 `key:"ages"`
|
Ages []int64 `key:"ages"`
|
||||||
|
Slice []int64 `key:"slice"`
|
||||||
}
|
}
|
||||||
m := map[string]interface{}{
|
m := map[string]interface{}{
|
||||||
"ages": []int64{1, 2},
|
"ages": []int64{1, 2},
|
||||||
|
"slice": []interface{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
ast := assert.New(t)
|
ast := assert.New(t)
|
||||||
ast.Nil(UnmarshalKey(m, &v))
|
ast.Nil(UnmarshalKey(m, &v))
|
||||||
ast.ElementsMatch([]int64{1, 2}, v.Ages)
|
ast.ElementsMatch([]int64{1, 2}, v.Ages)
|
||||||
|
ast.Equal([]int64{}, v.Slice)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUnmarshalIntSlice(t *testing.T) {
|
func TestUnmarshalIntSlice(t *testing.T) {
|
||||||
var v struct {
|
var v struct {
|
||||||
Ages []int `key:"ages"`
|
Ages []int `key:"ages"`
|
||||||
|
Slice []int `key:"slice"`
|
||||||
}
|
}
|
||||||
m := map[string]interface{}{
|
m := map[string]interface{}{
|
||||||
"ages": []int{1, 2},
|
"ages": []int{1, 2},
|
||||||
|
"slice": []interface{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
ast := assert.New(t)
|
ast := assert.New(t)
|
||||||
ast.Nil(UnmarshalKey(m, &v))
|
ast.Nil(UnmarshalKey(m, &v))
|
||||||
ast.ElementsMatch([]int{1, 2}, v.Ages)
|
ast.ElementsMatch([]int{1, 2}, v.Ages)
|
||||||
|
ast.Equal([]int{}, v.Slice)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUnmarshalString(t *testing.T) {
|
func TestUnmarshalString(t *testing.T) {
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
|
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
@@ -14,7 +13,7 @@ const yamlTagKey = "json"
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrUnsupportedType is an error that indicates the config format is not supported.
|
// ErrUnsupportedType is an error that indicates the config format is not supported.
|
||||||
ErrUnsupportedType = errors.New("only map-like configs are suported")
|
ErrUnsupportedType = errors.New("only map-like configs are supported")
|
||||||
|
|
||||||
yamlUnmarshaler = NewUnmarshaler(yamlTagKey)
|
yamlUnmarshaler = NewUnmarshaler(yamlTagKey)
|
||||||
)
|
)
|
||||||
@@ -29,39 +28,6 @@ func UnmarshalYamlReader(reader io.Reader, v interface{}) error {
|
|||||||
return unmarshalYamlReader(reader, v, yamlUnmarshaler)
|
return unmarshalYamlReader(reader, v, yamlUnmarshaler)
|
||||||
}
|
}
|
||||||
|
|
||||||
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
|
|
||||||
var o interface{}
|
|
||||||
if err := yamlUnmarshal(content, &o); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if m, ok := o.(map[string]interface{}); ok {
|
|
||||||
return unmarshaler.Unmarshal(m, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ErrUnsupportedType
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
|
|
||||||
content, err := ioutil.ReadAll(reader)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return unmarshalYamlBytes(content, v, unmarshaler)
|
|
||||||
}
|
|
||||||
|
|
||||||
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
|
|
||||||
func yamlUnmarshal(in []byte, out interface{}) error {
|
|
||||||
var res interface{}
|
|
||||||
if err := yaml.Unmarshal(in, &res); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
*out.(*interface{}) = cleanupMapValue(res)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func cleanupInterfaceMap(in map[interface{}]interface{}) map[string]interface{} {
|
func cleanupInterfaceMap(in map[interface{}]interface{}) map[string]interface{} {
|
||||||
res := make(map[string]interface{})
|
res := make(map[string]interface{})
|
||||||
for k, v := range in {
|
for k, v := range in {
|
||||||
@@ -96,3 +62,40 @@ func cleanupMapValue(v interface{}) interface{} {
|
|||||||
return Repr(v)
|
return Repr(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func unmarshal(unmarshaler *Unmarshaler, o interface{}, v interface{}) error {
|
||||||
|
if m, ok := o.(map[string]interface{}); ok {
|
||||||
|
return unmarshaler.Unmarshal(m, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ErrUnsupportedType
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
|
||||||
|
var o interface{}
|
||||||
|
if err := yamlUnmarshal(content, &o); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return unmarshal(unmarshaler, o, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
|
||||||
|
var res interface{}
|
||||||
|
if err := yaml.NewDecoder(reader).Decode(&res); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return unmarshal(unmarshaler, cleanupMapValue(res), v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
|
||||||
|
func yamlUnmarshal(in []byte, out interface{}) error {
|
||||||
|
var res interface{}
|
||||||
|
if err := yaml.Unmarshal(in, &res); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*out.(*interface{}) = cleanupMapValue(res)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -926,14 +926,17 @@ func TestUnmarshalYamlBytesError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUnmarshalYamlReaderError(t *testing.T) {
|
func TestUnmarshalYamlReaderError(t *testing.T) {
|
||||||
payload := `abcd: cdef`
|
|
||||||
reader := strings.NewReader(payload)
|
|
||||||
var v struct {
|
var v struct {
|
||||||
Any string
|
Any string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reader := strings.NewReader(`abcd: cdef`)
|
||||||
err := UnmarshalYamlReader(reader, &v)
|
err := UnmarshalYamlReader(reader, &v)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
|
|
||||||
|
reader = strings.NewReader("chenquan")
|
||||||
|
err = UnmarshalYamlReader(reader, &v)
|
||||||
|
assert.ErrorIs(t, err, ErrUnsupportedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUnmarshalYamlBadReader(t *testing.T) {
|
func TestUnmarshalYamlBadReader(t *testing.T) {
|
||||||
@@ -1011,6 +1014,6 @@ func TestUnmarshalYamlMapRune(t *testing.T) {
|
|||||||
|
|
||||||
type badReader struct{}
|
type badReader struct{}
|
||||||
|
|
||||||
func (b *badReader) Read(p []byte) (n int, err error) {
|
func (b *badReader) Read(_ []byte) (n int, err error) {
|
||||||
return 0, io.ErrLimitReached
|
return 0, io.ErrLimitReached
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMaxInt(t *testing.T) {
|
func TestMaxInt(t *testing.T) {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package metric
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
prom "github.com/prometheus/client_golang/prometheus"
|
prom "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package metric
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
prom "github.com/prometheus/client_golang/prometheus"
|
prom "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package metric
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
prom "github.com/prometheus/client_golang/prometheus"
|
prom "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -3,13 +3,11 @@ package mr
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/errorx"
|
"github.com/zeromicro/go-zero/core/errorx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -25,12 +23,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
// ForEachFunc is used to do element processing, but no output.
|
||||||
|
ForEachFunc func(item interface{})
|
||||||
// GenerateFunc is used to let callers send elements into source.
|
// GenerateFunc is used to let callers send elements into source.
|
||||||
GenerateFunc func(source chan<- interface{})
|
GenerateFunc func(source chan<- interface{})
|
||||||
// MapFunc is used to do element processing and write the output to writer.
|
// MapFunc is used to do element processing and write the output to writer.
|
||||||
MapFunc func(item interface{}, writer Writer)
|
MapFunc func(item interface{}, writer Writer)
|
||||||
// VoidMapFunc is used to do element processing, but no output.
|
|
||||||
VoidMapFunc func(item interface{})
|
|
||||||
// MapperFunc is used to do element processing and write the output to writer,
|
// MapperFunc is used to do element processing and write the output to writer,
|
||||||
// use cancel func to cancel the processing.
|
// use cancel func to cancel the processing.
|
||||||
MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
||||||
@@ -43,6 +41,16 @@ type (
|
|||||||
// Option defines the method to customize the mapreduce.
|
// Option defines the method to customize the mapreduce.
|
||||||
Option func(opts *mapReduceOptions)
|
Option func(opts *mapReduceOptions)
|
||||||
|
|
||||||
|
mapperContext struct {
|
||||||
|
ctx context.Context
|
||||||
|
mapper MapFunc
|
||||||
|
source <-chan interface{}
|
||||||
|
panicChan *onceChan
|
||||||
|
collector chan<- interface{}
|
||||||
|
doneChan <-chan lang.PlaceholderType
|
||||||
|
workers int
|
||||||
|
}
|
||||||
|
|
||||||
mapReduceOptions struct {
|
mapReduceOptions struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
workers int
|
workers int
|
||||||
@@ -70,7 +78,6 @@ func Finish(fns ...func() error) error {
|
|||||||
cancel(err)
|
cancel(err)
|
||||||
}
|
}
|
||||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
drain(pipe)
|
|
||||||
}, WithWorkers(len(fns)))
|
}, WithWorkers(len(fns)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,7 +87,7 @@ func FinishVoid(fns ...func()) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
MapVoid(func(source chan<- interface{}) {
|
ForEach(func(source chan<- interface{}) {
|
||||||
for _, fn := range fns {
|
for _, fn := range fns {
|
||||||
source <- fn
|
source <- fn
|
||||||
}
|
}
|
||||||
@@ -90,45 +97,78 @@ func FinishVoid(fns ...func()) {
|
|||||||
}, WithWorkers(len(fns)))
|
}, WithWorkers(len(fns)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map maps all elements generated from given generate func, and returns an output channel.
|
// ForEach maps all elements from given generate but no output.
|
||||||
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||||
options := buildOptions(opts...)
|
options := buildOptions(opts...)
|
||||||
source := buildSource(generate)
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
|
source := buildSource(generate, panicChan)
|
||||||
collector := make(chan interface{}, options.workers)
|
collector := make(chan interface{}, options.workers)
|
||||||
done := syncx.NewDoneChan()
|
done := make(chan lang.PlaceholderType)
|
||||||
|
|
||||||
go executeMappers(options.ctx, mapper, source, collector, done.Done(), options.workers)
|
go executeMappers(mapperContext{
|
||||||
|
ctx: options.ctx,
|
||||||
|
mapper: func(item interface{}, writer Writer) {
|
||||||
|
mapper(item)
|
||||||
|
},
|
||||||
|
source: source,
|
||||||
|
panicChan: panicChan,
|
||||||
|
collector: collector,
|
||||||
|
doneChan: done,
|
||||||
|
workers: options.workers,
|
||||||
|
})
|
||||||
|
|
||||||
return collector
|
for {
|
||||||
|
select {
|
||||||
|
case v := <-panicChan.channel:
|
||||||
|
panic(v)
|
||||||
|
case _, ok := <-collector:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapReduce maps all elements generated from given generate func,
|
// MapReduce maps all elements generated from given generate func,
|
||||||
// and reduces the output elements with given reducer.
|
// and reduces the output elements with given reducer.
|
||||||
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
||||||
opts ...Option) (interface{}, error) {
|
opts ...Option) (interface{}, error) {
|
||||||
source := buildSource(generate)
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
return MapReduceWithSource(source, mapper, reducer, opts...)
|
source := buildSource(generate, panicChan)
|
||||||
|
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
|
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||||
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
||||||
opts ...Option) (interface{}, error) {
|
opts ...Option) (interface{}, error) {
|
||||||
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
|
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||||
|
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
|
||||||
|
reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
||||||
options := buildOptions(opts...)
|
options := buildOptions(opts...)
|
||||||
|
// output is used to write the final result
|
||||||
output := make(chan interface{})
|
output := make(chan interface{})
|
||||||
defer func() {
|
defer func() {
|
||||||
|
// reducer can only write once, if more, panic
|
||||||
for range output {
|
for range output {
|
||||||
panic("more than one element written in reducer")
|
panic("more than one element written in reducer")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// collector is used to collect data from mapper, and consume in reducer
|
||||||
collector := make(chan interface{}, options.workers)
|
collector := make(chan interface{}, options.workers)
|
||||||
done := syncx.NewDoneChan()
|
// if done is closed, all mappers and reducer should stop processing
|
||||||
writer := newGuardedWriter(options.ctx, output, done.Done())
|
done := make(chan lang.PlaceholderType)
|
||||||
|
writer := newGuardedWriter(options.ctx, output, done)
|
||||||
var closeOnce sync.Once
|
var closeOnce sync.Once
|
||||||
|
// use atomic.Value to avoid data race
|
||||||
var retErr errorx.AtomicError
|
var retErr errorx.AtomicError
|
||||||
finish := func() {
|
finish := func() {
|
||||||
closeOnce.Do(func() {
|
closeOnce.Do(func() {
|
||||||
done.Close()
|
close(done)
|
||||||
close(output)
|
close(output)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -146,28 +186,41 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
drain(collector)
|
drain(collector)
|
||||||
|
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
cancel(fmt.Errorf("%v", r))
|
panicChan.write(r)
|
||||||
} else {
|
|
||||||
finish()
|
|
||||||
}
|
}
|
||||||
|
finish()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
reducer(collector, writer, cancel)
|
reducer(collector, writer, cancel)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go executeMappers(options.ctx, func(item interface{}, w Writer) {
|
go executeMappers(mapperContext{
|
||||||
mapper(item, w, cancel)
|
ctx: options.ctx,
|
||||||
}, source, collector, done.Done(), options.workers)
|
mapper: func(item interface{}, w Writer) {
|
||||||
|
mapper(item, w, cancel)
|
||||||
|
},
|
||||||
|
source: source,
|
||||||
|
panicChan: panicChan,
|
||||||
|
collector: collector,
|
||||||
|
doneChan: done,
|
||||||
|
workers: options.workers,
|
||||||
|
})
|
||||||
|
|
||||||
value, ok := <-output
|
select {
|
||||||
if err := retErr.Load(); err != nil {
|
case <-options.ctx.Done():
|
||||||
return nil, err
|
cancel(context.DeadlineExceeded)
|
||||||
} else if ok {
|
return nil, context.DeadlineExceeded
|
||||||
return value, nil
|
case v := <-panicChan.channel:
|
||||||
} else {
|
panic(v)
|
||||||
return nil, ErrReduceNoOutput
|
case v, ok := <-output:
|
||||||
|
if err := retErr.Load(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if ok {
|
||||||
|
return v, nil
|
||||||
|
} else {
|
||||||
|
return nil, ErrReduceNoOutput
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,18 +229,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|||||||
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
||||||
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
reducer(input, cancel)
|
reducer(input, cancel)
|
||||||
// We need to write a placeholder to let MapReduce to continue on reducer done,
|
|
||||||
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
|
|
||||||
writer.Write(lang.Placeholder)
|
|
||||||
}, opts...)
|
}, opts...)
|
||||||
return err
|
if errors.Is(err, ErrReduceNoOutput) {
|
||||||
}
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// MapVoid maps all elements from given generate but no output.
|
return err
|
||||||
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
|
|
||||||
drain(Map(generate, func(item interface{}, writer Writer) {
|
|
||||||
mapper(item)
|
|
||||||
}, opts...))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContext customizes a mapreduce processing accepts a given ctx.
|
// WithContext customizes a mapreduce processing accepts a given ctx.
|
||||||
@@ -217,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildSource(generate GenerateFunc) chan interface{} {
|
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
|
||||||
source := make(chan interface{})
|
source := make(chan interface{})
|
||||||
threading.GoSafe(func() {
|
go func() {
|
||||||
defer close(source)
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
panicChan.write(r)
|
||||||
|
}
|
||||||
|
close(source)
|
||||||
|
}()
|
||||||
|
|
||||||
generate(source)
|
generate(source)
|
||||||
})
|
}()
|
||||||
|
|
||||||
return source
|
return source
|
||||||
}
|
}
|
||||||
@@ -234,39 +287,43 @@ func drain(channel <-chan interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
|
func executeMappers(mCtx mapperContext) {
|
||||||
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer func() {
|
defer func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(collector)
|
close(mCtx.collector)
|
||||||
|
drain(mCtx.source)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pool := make(chan lang.PlaceholderType, workers)
|
var failed int32
|
||||||
writer := newGuardedWriter(ctx, collector, done)
|
pool := make(chan lang.PlaceholderType, mCtx.workers)
|
||||||
for {
|
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
|
||||||
|
for atomic.LoadInt32(&failed) == 0 {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-mCtx.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-done:
|
case <-mCtx.doneChan:
|
||||||
return
|
return
|
||||||
case pool <- lang.Placeholder:
|
case pool <- lang.Placeholder:
|
||||||
item, ok := <-input
|
item, ok := <-mCtx.source
|
||||||
if !ok {
|
if !ok {
|
||||||
<-pool
|
<-pool
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
// better to safely run caller defined method
|
go func() {
|
||||||
threading.GoSafe(func() {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
atomic.AddInt32(&failed, 1)
|
||||||
|
mCtx.panicChan.write(r)
|
||||||
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
<-pool
|
<-pool
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mapper(item, writer)
|
mCtx.mapper(item, writer)
|
||||||
})
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -312,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
|
|||||||
gw.channel <- v
|
gw.channel <- v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type onceChan struct {
|
||||||
|
channel chan interface{}
|
||||||
|
wrote int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oc *onceChan) write(val interface{}) {
|
||||||
|
if atomic.AddInt32(&oc.wrote, 1) > 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
oc.channel <- val
|
||||||
|
}
|
||||||
|
|||||||
78
core/mr/mapreduce_fuzz_test.go
Normal file
78
core/mr/mapreduce_fuzz_test.go
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
//go:build go1.18
|
||||||
|
// +build go1.18
|
||||||
|
|
||||||
|
package mr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
)
|
||||||
|
|
||||||
|
func FuzzMapReduce(f *testing.F) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
f.Add(uint(10), uint(runtime.NumCPU()))
|
||||||
|
f.Fuzz(func(t *testing.T, num uint, workers uint) {
|
||||||
|
n := int64(num)%5000 + 5000
|
||||||
|
genPanic := rand.Intn(100) == 0
|
||||||
|
mapperPanic := rand.Intn(100) == 0
|
||||||
|
reducerPanic := rand.Intn(100) == 0
|
||||||
|
genIdx := rand.Int63n(n)
|
||||||
|
mapperIdx := rand.Int63n(n)
|
||||||
|
reducerIdx := rand.Int63n(n)
|
||||||
|
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||||
|
|
||||||
|
fn := func() (interface{}, error) {
|
||||||
|
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||||
|
|
||||||
|
return MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
source <- i
|
||||||
|
if genPanic && i == genIdx {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int64)
|
||||||
|
if mapperPanic && v == mapperIdx {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
writer.Write(v * v)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var idx int64
|
||||||
|
var total int64
|
||||||
|
for v := range pipe {
|
||||||
|
if reducerPanic && idx == reducerIdx {
|
||||||
|
panic("baz")
|
||||||
|
}
|
||||||
|
total += v.(int64)
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
writer.Write(total)
|
||||||
|
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if genPanic || mapperPanic || reducerPanic {
|
||||||
|
var buf strings.Builder
|
||||||
|
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||||
|
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||||
|
} else {
|
||||||
|
val, err := fn()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, squareSum, val.(int64))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
107
core/mr/mapreduce_fuzzcase_test.go
Normal file
107
core/mr/mapreduce_fuzzcase_test.go
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
//go:build fuzz
|
||||||
|
// +build fuzz
|
||||||
|
|
||||||
|
package mr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
|
"gopkg.in/cheggaaa/pb.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
|
||||||
|
// so we need to simulate the fuzz test in test mode.
|
||||||
|
func TestMapReduceRandom(t *testing.T) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
const (
|
||||||
|
times = 10000
|
||||||
|
nRange = 500
|
||||||
|
mega = 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
bar := pb.New(times).Start()
|
||||||
|
runner := threading.NewTaskRunner(runtime.NumCPU())
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(times)
|
||||||
|
for i := 0; i < times; i++ {
|
||||||
|
runner.Schedule(func() {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
if time.Since(start) > time.Minute {
|
||||||
|
t.Fatal("timeout")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||||
|
n := rand.Int63n(nRange)%nRange + nRange
|
||||||
|
workers := rand.Int()%50 + runtime.NumCPU()/2
|
||||||
|
genPanic := rand.Intn(100) == 0
|
||||||
|
mapperPanic := rand.Intn(100) == 0
|
||||||
|
reducerPanic := rand.Intn(100) == 0
|
||||||
|
genIdx := rand.Int63n(n)
|
||||||
|
mapperIdx := rand.Int63n(n)
|
||||||
|
reducerIdx := rand.Int63n(n)
|
||||||
|
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||||
|
|
||||||
|
fn := func() (interface{}, error) {
|
||||||
|
return MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
source <- i
|
||||||
|
if genPanic && i == genIdx {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int64)
|
||||||
|
if mapperPanic && v == mapperIdx {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
writer.Write(v * v)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var idx int64
|
||||||
|
var total int64
|
||||||
|
for v := range pipe {
|
||||||
|
if reducerPanic && idx == reducerIdx {
|
||||||
|
panic("baz")
|
||||||
|
}
|
||||||
|
total += v.(int64)
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
writer.Write(total)
|
||||||
|
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if genPanic || mapperPanic || reducerPanic {
|
||||||
|
var buf strings.Builder
|
||||||
|
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||||
|
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||||
|
} else {
|
||||||
|
val, err := fn()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, squareSum, val.(int64))
|
||||||
|
}
|
||||||
|
bar.Increment()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
bar.Finish()
|
||||||
|
}
|
||||||
@@ -11,8 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"go.uber.org/goleak"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errDummy = errors.New("dummy")
|
var errDummy = errors.New("dummy")
|
||||||
@@ -22,6 +21,8 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFinish(t *testing.T) {
|
func TestFinish(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var total uint32
|
var total uint32
|
||||||
err := Finish(func() error {
|
err := Finish(func() error {
|
||||||
atomic.AddUint32(&total, 2)
|
atomic.AddUint32(&total, 2)
|
||||||
@@ -39,14 +40,20 @@ func TestFinish(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishNone(t *testing.T) {
|
func TestFinishNone(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
assert.Nil(t, Finish())
|
assert.Nil(t, Finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishVoidNone(t *testing.T) {
|
func TestFinishVoidNone(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
FinishVoid()
|
FinishVoid()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishErr(t *testing.T) {
|
func TestFinishErr(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var total uint32
|
var total uint32
|
||||||
err := Finish(func() error {
|
err := Finish(func() error {
|
||||||
atomic.AddUint32(&total, 2)
|
atomic.AddUint32(&total, 2)
|
||||||
@@ -63,6 +70,8 @@ func TestFinishErr(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishVoid(t *testing.T) {
|
func TestFinishVoid(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var total uint32
|
var total uint32
|
||||||
FinishVoid(func() {
|
FinishVoid(func() {
|
||||||
atomic.AddUint32(&total, 2)
|
atomic.AddUint32(&total, 2)
|
||||||
@@ -75,70 +84,107 @@ func TestFinishVoid(t *testing.T) {
|
|||||||
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
|
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMap(t *testing.T) {
|
func TestForEach(t *testing.T) {
|
||||||
tests := []struct {
|
const tasks = 1000
|
||||||
mapper MapFunc
|
|
||||||
expect int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 30,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
panic(v)
|
|
||||||
}
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
t.Run("all", func(t *testing.T) {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
defer goleak.VerifyNone(t)
|
||||||
channel := Map(func(source chan<- interface{}) {
|
|
||||||
for i := 1; i < 5; i++ {
|
var count uint32
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}) {
|
||||||
|
atomic.AddUint32(&count, 1)
|
||||||
|
}, WithWorkers(-1))
|
||||||
|
|
||||||
|
assert.Equal(t, tasks, int(count))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("odd", func(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var count uint32
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}) {
|
||||||
|
if item.(int)%2 == 0 {
|
||||||
|
atomic.AddUint32(&count, 1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, tasks/2, int(count))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
}, test.mapper, WithWorkers(-1))
|
}, func(item interface{}) {
|
||||||
|
panic("foo")
|
||||||
var result int
|
})
|
||||||
for v := range channel {
|
|
||||||
result += v.(int)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, test.expect, result)
|
|
||||||
})
|
})
|
||||||
}
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGeneratePanic(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
panic("foo")
|
||||||
|
}, func(item interface{}) {
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapperPanic(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
const tasks = 1000
|
||||||
|
var run int32
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
atomic.AddInt32(&run, 1)
|
||||||
|
panic("foo")
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
})
|
||||||
|
})
|
||||||
|
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduce(t *testing.T) {
|
func TestMapReduce(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
name string
|
||||||
mapper MapperFunc
|
mapper MapperFunc
|
||||||
reducer ReducerFunc
|
reducer ReducerFunc
|
||||||
expectErr error
|
expectErr error
|
||||||
expectValue interface{}
|
expectValue interface{}
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "simple",
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectValue: 30,
|
expectValue: 30,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with error",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -149,6 +195,7 @@ func TestMapReduce(t *testing.T) {
|
|||||||
expectErr: errDummy,
|
expectErr: errDummy,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with nil",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -160,6 +207,7 @@ func TestMapReduce(t *testing.T) {
|
|||||||
expectValue: nil,
|
expectValue: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with more",
|
||||||
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
var result int
|
var result int
|
||||||
for item := range pipe {
|
for item := range pipe {
|
||||||
@@ -174,36 +222,74 @@ func TestMapReduce(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
t.Run("MapReduce", func(t *testing.T) {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
for _, test := range tests {
|
||||||
if test.mapper == nil {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
if test.mapper == nil {
|
||||||
v := item.(int)
|
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
writer.Write(v * v)
|
v := item.(int)
|
||||||
}
|
writer.Write(v * v)
|
||||||
}
|
|
||||||
if test.reducer == nil {
|
|
||||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
|
||||||
var result int
|
|
||||||
for item := range pipe {
|
|
||||||
result += item.(int)
|
|
||||||
}
|
}
|
||||||
writer.Write(result)
|
|
||||||
}
|
}
|
||||||
}
|
if test.reducer == nil {
|
||||||
value, err := MapReduce(func(source chan<- interface{}) {
|
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
for i := 1; i < 5; i++ {
|
var result int
|
||||||
source <- i
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
writer.Write(result)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
value, err := MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 1; i < 5; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||||
|
|
||||||
assert.Equal(t, test.expectErr, err)
|
assert.Equal(t, test.expectErr, err)
|
||||||
assert.Equal(t, test.expectValue, value)
|
assert.Equal(t, test.expectValue, value)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MapReduce", func(t *testing.T) {
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
if test.mapper == nil {
|
||||||
|
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int)
|
||||||
|
writer.Write(v * v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if test.reducer == nil {
|
||||||
|
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var result int
|
||||||
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
writer.Write(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
source := make(chan interface{})
|
||||||
|
go func() {
|
||||||
|
for i := 1; i < 5; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
close(source)
|
||||||
|
}()
|
||||||
|
|
||||||
|
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
|
||||||
|
assert.Equal(t, test.expectErr, err)
|
||||||
|
assert.Equal(t, test.expectValue, value)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
assert.Panics(t, func() {
|
||||||
MapReduce(func(source chan<- interface{}) {
|
MapReduce(func(source chan<- interface{}) {
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@@ -220,18 +306,23 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoid(t *testing.T) {
|
func TestMapReduceVoid(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var value uint32
|
var value uint32
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
name string
|
||||||
mapper MapperFunc
|
mapper MapperFunc
|
||||||
reducer VoidReducerFunc
|
reducer VoidReducerFunc
|
||||||
expectValue uint32
|
expectValue uint32
|
||||||
expectErr error
|
expectErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "simple",
|
||||||
expectValue: 30,
|
expectValue: 30,
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with error",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -242,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
expectErr: errDummy,
|
expectErr: errDummy,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with nil",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -252,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
expectErr: ErrCancelWithNil,
|
expectErr: ErrCancelWithNil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with more",
|
||||||
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
for item := range pipe {
|
for item := range pipe {
|
||||||
result := atomic.AddUint32(&value, uint32(item.(int)))
|
result := atomic.AddUint32(&value, uint32(item.(int)))
|
||||||
@@ -265,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
atomic.StoreUint32(&value, 0)
|
atomic.StoreUint32(&value, 0)
|
||||||
|
|
||||||
if test.mapper == nil {
|
if test.mapper == nil {
|
||||||
@@ -296,6 +389,8 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidWithDelay(t *testing.T) {
|
func TestMapReduceVoidWithDelay(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
source <- 0
|
source <- 0
|
||||||
@@ -318,38 +413,64 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
|
|||||||
assert.Equal(t, 0, result[1])
|
assert.Equal(t, 0, result[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapVoid(t *testing.T) {
|
func TestMapReducePanic(t *testing.T) {
|
||||||
const tasks = 1000
|
defer goleak.VerifyNone(t)
|
||||||
var count uint32
|
|
||||||
MapVoid(func(source chan<- interface{}) {
|
|
||||||
for i := 0; i < tasks; i++ {
|
|
||||||
source <- i
|
|
||||||
}
|
|
||||||
}, func(item interface{}) {
|
|
||||||
atomic.AddUint32(&count, 1)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Equal(t, tasks, int(count))
|
assert.Panics(t, func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
source <- 0
|
||||||
|
source <- 1
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
for range pipe {
|
||||||
|
panic("panic")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReducePanic(t *testing.T) {
|
func TestMapReducePanicOnce(t *testing.T) {
|
||||||
v, err := MapReduce(func(source chan<- interface{}) {
|
defer goleak.VerifyNone(t)
|
||||||
source <- 0
|
|
||||||
source <- 1
|
assert.Panics(t, func() {
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
i := item.(int)
|
for i := 0; i < 100; i++ {
|
||||||
writer.Write(i)
|
source <- i
|
||||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
}
|
||||||
for range pipe {
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
panic("panic")
|
i := item.(int)
|
||||||
}
|
if i == 0 {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
for range pipe {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
assert.Panics(t, func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
source <- 0
|
||||||
|
source <- 1
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
panic("foo")
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
panic("bar")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
assert.Nil(t, v)
|
|
||||||
assert.NotNil(t, err)
|
|
||||||
assert.Equal(t, "panic", err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidCancel(t *testing.T) {
|
func TestMapReduceVoidCancel(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
source <- 0
|
source <- 0
|
||||||
@@ -371,13 +492,15 @@ func TestMapReduceVoidCancel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||||
var done syncx.AtomicBool
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
if i == defaultWorkers/2 {
|
if i == defaultWorkers/2 {
|
||||||
@@ -392,10 +515,12 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
assert.Equal(t, "anything", err.Error())
|
assert.Equal(t, "anything", err.Error())
|
||||||
assert.True(t, done.True())
|
assert.Equal(t, int32(1), done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
uids := []int{1, 2, 3}
|
uids := []int{1, 2, 3}
|
||||||
res, err := MapReduce(func(source chan<- interface{}) {
|
res, err := MapReduce(func(source chan<- interface{}) {
|
||||||
for _, uid := range uids {
|
for _, uid := range uids {
|
||||||
@@ -412,33 +537,54 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
const message = "foo"
|
const message = "foo"
|
||||||
var done syncx.AtomicBool
|
assert.Panics(t, func() {
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
var done int32
|
||||||
|
_ = MapReduceVoid(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&done, 1)
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
|
panic(message)
|
||||||
|
}, WithWorkers(1))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForEachWithContext(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
}, func(item interface{}) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
writer.Write(i)
|
if i == defaultWorkers/2 {
|
||||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
cancel()
|
||||||
panic(message)
|
}
|
||||||
}, WithWorkers(1))
|
}, WithContext(ctx))
|
||||||
assert.NotNil(t, err)
|
|
||||||
assert.Equal(t, message, err.Error())
|
|
||||||
assert.True(t, done.True())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithContext(t *testing.T) {
|
func TestMapReduceWithContext(t *testing.T) {
|
||||||
var done syncx.AtomicBool
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
var result []int
|
var result []int
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, c func(error)) {
|
}, func(item interface{}, writer Writer, c func(error)) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
if i == defaultWorkers/2 {
|
if i == defaultWorkers/2 {
|
||||||
@@ -452,7 +598,7 @@ func TestMapReduceWithContext(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}, WithContext(ctx))
|
}, WithContext(ctx))
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
assert.Equal(t, ErrReduceNoOutput, err)
|
assert.Equal(t, context.DeadlineExceeded, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkMapReduce(b *testing.B) {
|
func BenchmarkMapReduce(b *testing.B) {
|
||||||
|
|||||||
89
core/mr/readme-cn.md
Normal file
89
core/mr/readme-cn.md
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
# mapreduce
|
||||||
|
|
||||||
|
[English](readme.md) | 简体中文
|
||||||
|
|
||||||
|
## 为什么需要 MapReduce
|
||||||
|
|
||||||
|
在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。
|
||||||
|
|
||||||
|
比如要查询商品详情:
|
||||||
|
|
||||||
|
1. 商品服务-查询商品属性
|
||||||
|
2. 库存服务-查询库存属性
|
||||||
|
3. 价格服务-查询价格属性
|
||||||
|
4. 营销服务-查询营销属性
|
||||||
|
|
||||||
|
如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。
|
||||||
|
|
||||||
|
简单的场景下使用 `WaitGroup` 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 `WaitGroup` 就有点力不从心了,go 的官方库中并没有这种工具(java 中提供了 CompleteFuture),我们依据 MapReduce 架构思想实现了进程内的数据批处理 MapReduce 并发工具类。
|
||||||
|
|
||||||
|
## 设计思路
|
||||||
|
|
||||||
|
我们尝试把自己代入到作者的角色梳理一下并发工具可能的业务场景:
|
||||||
|
|
||||||
|
1. 查询商品详情:支持并发调用多个服务来组合产品属性,支持调用错误可以立即结束。
|
||||||
|
2. 商品详情页自动推荐用户卡券:支持并发校验卡券,校验失败自动剔除,返回全部卡券。
|
||||||
|
|
||||||
|
以上实际都是在进行对输入数据进行处理最后输出清洗后的数据,针对数据处理有个非常经典的异步模式:生产者消费者模式。于是我们可以抽象一下数据批处理的生命周期,大致可以分为三个阶段:
|
||||||
|
|
||||||
|
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-serial-cn.png" width="500">
|
||||||
|
|
||||||
|
1. 数据生产 generate
|
||||||
|
2. 数据加工 mapper
|
||||||
|
3. 数据聚合 reducer
|
||||||
|
|
||||||
|
其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。
|
||||||
|
|
||||||
|
再来思考一下不同阶段之间数据应该如何流转,既然不同阶段的数据处理都是由不同 goroutine 执行的,那么很自然的可以考虑采用 channel 来实现 goroutine 之间的通信。
|
||||||
|
|
||||||
|
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-cn.png" width="500">
|
||||||
|
|
||||||
|
|
||||||
|
如何实现随时终止流程呢?
|
||||||
|
|
||||||
|
`goroutine` 中监听一个全局的结束 `channel` 和调用方提供的 `ctx` 就行。
|
||||||
|
|
||||||
|
## 简单示例
|
||||||
|
|
||||||
|
并行求平方和(不要嫌弃示例简单,只是模拟并发)
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/mr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
val, err := mr.MapReduce(func(source chan<- interface{}) {
|
||||||
|
// generator
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
|
||||||
|
// mapper
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i * i)
|
||||||
|
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
|
||||||
|
// reducer
|
||||||
|
var sum int
|
||||||
|
for i := range pipe {
|
||||||
|
sum += i.(int)
|
||||||
|
}
|
||||||
|
writer.Write(sum)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
fmt.Println("result:", val)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
更多示例:[https://github.com/zeromicro/zero-examples/tree/main/mapreduce](https://github.com/zeromicro/zero-examples/tree/main/mapreduce)
|
||||||
|
|
||||||
|
## 欢迎 star!⭐
|
||||||
|
|
||||||
|
如果你正在使用或者觉得这个项目对你有帮助,请 **star** 支持,感谢!
|
||||||
90
core/mr/readme.md
Normal file
90
core/mr/readme.md
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
<img align="right" width="150px" src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/go-zero.png">
|
||||||
|
|
||||||
|
# mapreduce
|
||||||
|
|
||||||
|
English | [简体中文](readme-cn.md)
|
||||||
|
|
||||||
|
## Why MapReduce is needed
|
||||||
|
|
||||||
|
In practical business scenarios we often need to get the corresponding properties from different rpc services to assemble complex objects.
|
||||||
|
|
||||||
|
For example, to query product details.
|
||||||
|
|
||||||
|
1. product service - query product attributes
|
||||||
|
2. inventory service - query inventory properties
|
||||||
|
3. price service - query price attributes
|
||||||
|
4. marketing service - query marketing properties
|
||||||
|
|
||||||
|
If it is a serial call, the response time will increase linearly with the number of rpc calls, so we will generally change serial to parallel to optimize response time.
|
||||||
|
|
||||||
|
Simple scenarios using `WaitGroup` can also meet the needs, but what if we need to check the data returned by the rpc call, data processing, data aggregation? The official go library does not have such a tool (CompleteFuture is provided in java), so we implemented an in-process data batching MapReduce concurrent tool based on the MapReduce architecture.
|
||||||
|
|
||||||
|
## Design ideas
|
||||||
|
|
||||||
|
Let's try to put ourselves in the author's shoes and sort out the possible business scenarios for the concurrency tool:
|
||||||
|
|
||||||
|
1. querying product details: supporting concurrent calls to multiple services to combine product attributes, and supporting call errors that can be ended immediately.
|
||||||
|
2. automatic recommendation of user card coupons on product details page: support concurrently verifying card coupons, automatically rejecting them if they fail, and returning all of them.
|
||||||
|
|
||||||
|
The above is actually processing the input data and finally outputting the cleaned data. There is a very classic asynchronous pattern for data processing: the producer-consumer pattern. So we can abstract the life cycle of data batch processing, which can be roughly divided into three phases.
|
||||||
|
|
||||||
|
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-serial-en.png" width="500">
|
||||||
|
|
||||||
|
1. data production generate
|
||||||
|
2. data processing mapper
|
||||||
|
3. data aggregation reducer
|
||||||
|
|
||||||
|
Data producing is an indispensable stage, data processing and data aggregation are optional stages, data producing and processing support concurrent calls, data aggregation is basically a pure memory operation, so a single concurrent process can do it.
|
||||||
|
|
||||||
|
Since different stages of data processing are performed by different goroutines, it is natural to consider the use of channel to achieve communication between goroutines.
|
||||||
|
|
||||||
|
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-en.png" width="500">
|
||||||
|
|
||||||
|
How can I terminate the process at any time?
|
||||||
|
|
||||||
|
It's simple, just receive from a channel or the given context in the goroutine.
|
||||||
|
|
||||||
|
## A simple example
|
||||||
|
|
||||||
|
Calculate the sum of squares, simulating the concurrency.
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/mr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
val, err := mr.MapReduce(func(source chan<- interface{}) {
|
||||||
|
// generator
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
|
||||||
|
// mapper
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i * i)
|
||||||
|
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
|
||||||
|
// reducer
|
||||||
|
var sum int
|
||||||
|
for i := range pipe {
|
||||||
|
sum += i.(int)
|
||||||
|
}
|
||||||
|
writer.Write(sum)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
fmt.Println("result:", val)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](https://github.com/zeromicro/zero-examples/tree/main/mapreduce)
|
||||||
|
|
||||||
|
## Give a Star! ⭐
|
||||||
|
|
||||||
|
If you like or are using this project to learn or start your solution, please give it a star. Thanks!
|
||||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user