Commit eea95097 authored by Zhou Yaochen's avatar Zhou Yaochen
Browse files

add go mod

parent 586959db
Pipeline #11 failed with stages
in 0 seconds

Too many changes to show.

To preserve performance only 722 of 722+ files are displayed.
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"testing"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
)
func TestParseDuration(t *testing.T) {
tests := []struct {
in string
// out or fail are mutually exclusive
out time.Duration
fail bool
}{
{in: "10ms", out: 10 * time.Millisecond},
{in: "3s", out: 3 * time.Second},
{in: "60m", out: 60 * time.Minute},
{in: "12h", out: 12 * time.Hour},
{in: "7d", out: 168 * time.Hour},
{in: "", fail: true},
{in: "0", fail: true},
{in: "7ns", fail: true},
{in: "14mo", fail: true},
{in: "3.5h", fail: true},
{in: "106752d", fail: true}, // overflow
}
for _, tc := range tests {
got, err := parseDuration(tc.in)
if !tc.fail && err != nil {
t.Errorf("parseDuration(%q) unexpectedly failed: %v", tc.in, err)
continue
}
if tc.fail && err == nil {
t.Errorf("parseDuration(%q) did not fail", tc.in)
continue
}
if tc.fail {
continue
}
if got != tc.out {
t.Errorf("parseDuration(%q) = %v, want %v", tc.in, got, tc.out)
}
}
}
func TestParseArgs(t *testing.T) {
got, err := parseArgs([]string{"a=1", "b=2"}, []string{"a", "b"})
if err != nil {
t.Fatal(err)
}
want := map[string]string{"a": "1", "b": "2"}
if !testutil.Equal(got, want) {
t.Fatalf("got %v, want %v", got, want)
}
if _, err := parseArgs([]string{"a1"}, []string{"a1"}); err == nil {
t.Error("malformed: got nil, want error")
}
if _, err := parseArgs([]string{"a=1"}, []string{"b"}); err == nil {
t.Error("invalid: got nil, want error")
}
}
func TestParseColumnsFilter(t *testing.T) {
tests := []struct {
in string
out bigtable.Filter
fail bool
}{
{
in: "columnA",
out: bigtable.ColumnFilter("columnA"),
},
{
in: "familyA:columnA",
out: bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
},
{
in: "columnA,columnB",
out: bigtable.InterleaveFilters(bigtable.ColumnFilter("columnA"), bigtable.ColumnFilter("columnB")),
},
{
in: "familyA:columnA,columnB",
out: bigtable.InterleaveFilters(
bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
bigtable.ColumnFilter("columnB"),
),
},
{
in: "columnA,familyB:columnB",
out: bigtable.InterleaveFilters(
bigtable.ColumnFilter("columnA"),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:columnA,familyB:columnB",
out: bigtable.InterleaveFilters(
bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:",
out: bigtable.FamilyFilter("familyA"),
},
{
in: ":columnA",
out: bigtable.ColumnFilter("columnA"),
},
{
in: ",:columnA,,familyB:columnB,",
out: bigtable.InterleaveFilters(
bigtable.ColumnFilter("columnA"),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:columnA:cellA",
fail: true,
},
{
in: "familyA::columnA",
fail: true,
},
}
for _, tc := range tests {
got, err := parseColumnsFilter(tc.in)
if !tc.fail && err != nil {
t.Errorf("parseColumnsFilter(%q) unexpectedly failed: %v", tc.in, err)
continue
}
if tc.fail && err == nil {
t.Errorf("parseColumnsFilter(%q) did not fail", tc.in)
continue
}
if tc.fail {
continue
}
var cmpOpts cmp.Options
cmpOpts =
append(
cmpOpts,
cmp.AllowUnexported(bigtable.ChainFilters([]bigtable.Filter{}...)),
cmp.AllowUnexported(bigtable.InterleaveFilters([]bigtable.Filter{}...)))
if !cmp.Equal(got, tc.out, cmpOpts) {
t.Errorf("parseColumnsFilter(%q) = %v, want %v", tc.in, got, tc.out)
}
}
}
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// DO NOT EDIT. THIS IS AUTOMATICALLY GENERATED.
// Run "go generate" to regenerate.
//go:generate go run cbt.go gcpolicy.go -o cbtdoc.go doc
/*
Cbt is a tool for doing basic interactions with Cloud Bigtable. To learn how to
install the cbt tool, see the
[cbt overview](https://cloud.google.com/bigtable/docs/cbt-overview).
Usage:
cbt [options] command [arguments]
The commands are:
count Count rows in a table
createinstance Create an instance with an initial cluster
createcluster Create a cluster in the configured instance
createfamily Create a column family
createtable Create a table
updatecluster Update a cluster in the configured instance
deleteinstance Delete an instance
deletecluster Delete a cluster from the configured instance
deletecolumn Delete all cells in a column
deletefamily Delete a column family
deleterow Delete a row
deletetable Delete a table
doc Print godoc-suitable documentation for cbt
help Print help text
listinstances List instances in a project
listclusters List clusters in an instance
lookup Read from a single row
ls List tables and column families
mddoc Print documentation for cbt in Markdown format
read Read rows
set Set value of a cell
setgcpolicy Set the GC policy for a column family
waitforreplication Block until all the completed writes have been replicated to all the clusters
createtablefromsnapshot Create a table from a snapshot (snapshots alpha)
createsnapshot Create a snapshot from a source table (snapshots alpha)
listsnapshots List snapshots in a cluster (snapshots alpha)
getsnapshot Get snapshot info (snapshots alpha)
deletesnapshot Delete snapshot in a cluster (snapshots alpha)
version Print the current cbt version
createappprofile Creates app profile for an instance
getappprofile Reads app profile for an instance
listappprofile Lists app profile for an instance
updateappprofile Updates app profile for an instance
deleteappprofile Deletes app profile for an instance
Use "cbt help <command>" for more information about a command.
The options are:
-project string
project ID, if unset uses gcloud configured project
-instance string
Cloud Bigtable instance
-creds string
if set, use application credentials in this file
Alpha features are not currently available to most Cloud Bigtable customers. The
features might be changed in backward-incompatible ways and are not recommended
for production use. They are not subject to any SLA or deprecation policy.
Note: cbt does not support specifying arbitrary bytes on the command line for
any value that Bigtable otherwise supports (e.g., row key, column qualifier,
etc.).
For convenience, values of the -project, -instance, -creds,
-admin-endpoint and -data-endpoint flags may be specified in
~/.cbtrc in this format:
project = my-project-123
instance = my-instance
creds = path-to-account-key.json
admin-endpoint = hostname:port
data-endpoint = hostname:port
All values are optional, and all will be overridden by flags.
Count rows in a table
Usage:
cbt count <table>
Create an instance with an initial cluster
Usage:
cbt createinstance <instance-id> <display-name> <cluster-id> <zone> <num-nodes> <storage type>
instance-id Permanent, unique id for the instance
display-name Description of the instance
cluster-id Permanent, unique id for the cluster in the instance
zone The zone in which to create the cluster
num-nodes The number of nodes to create
storage-type SSD or HDD
Create a cluster in the configured instance
Usage:
cbt createcluster <cluster-id> <zone> <num-nodes> <storage type>
cluster-id Permanent, unique id for the cluster in the instance
zone The zone in which to create the cluster
num-nodes The number of nodes to create
storage-type SSD or HDD
Create a column family
Usage:
cbt createfamily <table> <family>
Create a table
Usage:
cbt createtable <table> [families=family[:gcpolicy],...] [splits=split,...]
families: Column families and their associated GC policies. For gcpolicy,
see "setgcpolicy".
Example: families=family1:maxage=1w,family2:maxversions=1
splits: Row key to be used to initially split the table
Update a cluster in the configured instance
Usage:
cbt updatecluster <cluster-id> [num-nodes=num-nodes]
cluster-id Permanent, unique id for the cluster in the instance
num-nodes The number of nodes to update to
Delete an instance
Usage:
cbt deleteinstance <instance>
Delete a cluster from the configured instance
Usage:
cbt deletecluster <cluster>
Delete all cells in a column
Usage:
cbt deletecolumn <table> <row> <family> <column> [app-profile=<app profile id>]
app-profile=<app profile id> The app profile id to use for the request
Delete a column family
Usage:
cbt deletefamily <table> <family>
Delete a row
Usage:
cbt deleterow <table> <row> [app-profile=<app profile id>]
app-profile=<app profile id> The app profile id to use for the request
Delete a table
Usage:
cbt deletetable <table>
Print godoc-suitable documentation for cbt
Usage:
cbt doc
Print help text
Usage:
cbt help [command]
List instances in a project
Usage:
cbt listinstances
List clusters in an instance
Usage:
cbt listclusters
Read from a single row
Usage:
cbt lookup <table> <row> [columns=[family]:[qualifier],...] [cells-per-column=<n>] [app-profile=<app profile id>]
columns=[family]:[qualifier],... Read only these columns, comma-separated
cells-per-column=<n> Read only this many cells per column
app-profile=<app profile id> The app profile id to use for the request
List tables and column families
Usage:
cbt ls List tables
cbt ls <table> List column families in <table>
Print documentation for cbt in Markdown format
Usage:
cbt mddoc
Read rows
Usage:
cbt read <table> [start=<row>] [end=<row>] [prefix=<prefix>] [regex=<regex>] [columns=[family]:[qualifier],...] [count=<n>] [cells-per-column=<n>] [app-profile=<app profile id>]
start=<row> Start reading at this row
end=<row> Stop reading before this row
prefix=<prefix> Read rows with this prefix
regex=<regex> Read rows with keys matching this regex
columns=[family]:[qualifier],... Read only these columns, comma-separated
count=<n> Read only this many rows
cells-per-column=<n> Read only this many cells per column
app-profile=<app profile id> The app profile id to use for the request
Set value of a cell
Usage:
cbt set <table> <row> [app-profile=<app profile id>] family:column=val[@ts] ...
app-profile=<app profile id> The app profile id to use for the request
family:column=val[@ts] may be repeated to set multiple cells.
ts is an optional integer timestamp.
If it cannot be parsed, the `@ts` part will be
interpreted as part of the value.
Set the GC policy for a column family
Usage:
cbt setgcpolicy <table> <family> ((maxage=<d> | maxversions=<n>) [(and|or) (maxage=<d> | maxversions=<n>),...] | never)
maxage=<d> Maximum timestamp age to preserve (e.g. "1h", "4d")
maxversions=<n> Maximum number of versions to preserve
Block until all the completed writes have been replicated to all the clusters
Usage:
cbt waitforreplication <table>
Create a table from a snapshot (snapshots alpha)
Usage:
cbt createtablefromsnapshot <table> <cluster> <snapshot>
table The name of the table to create
cluster The cluster where the snapshot is located
snapshot The snapshot to restore
Create a snapshot from a source table (snapshots alpha)
Usage:
cbt createsnapshot <cluster> <snapshot> <table> [ttl=<d>]
[ttl=<d>] Lifespan of the snapshot (e.g. "1h", "4d")
List snapshots in a cluster (snapshots alpha)
Usage:
cbt listsnapshots [<cluster>]
Get snapshot info (snapshots alpha)
Usage:
cbt getsnapshot <cluster> <snapshot>
Delete snapshot in a cluster (snapshots alpha)
Usage:
cbt deletesnapshot <cluster> <snapshot>
Print the current cbt version
Usage:
cbt version
Creates app profile for an instance
Usage:
usage: cbt createappprofile <instance-id> <profile-id> <description> (route-any | [ route-to=<cluster-id> : transactional-writes]) [optional flag]
optional flags may be `force`
Reads app profile for an instance
Usage:
cbt getappprofile <instance-id> <profile-id>
Lists app profile for an instance
Usage:
cbt listappprofile <instance-id>
Updates app profile for an instance
Usage:
usage: cbt updateappprofile <instance-id> <profile-id> <description>(route-any | [ route-to=<cluster-id> : transactional-writes]) [optional flag]
optional flags may be `force`
Deletes app profile for an instance
Usage:
cbt deleteappprofile <instance-id> <profile-id>
*/
package main
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"errors"
"fmt"
"io"
"strconv"
"strings"
"unicode"
"cloud.google.com/go/bigtable"
)
// Parse a GC policy. Valid policies include
// never
// maxage = 5d
// maxversions = 3
// maxage = 5d || maxversions = 3
// maxage=30d || (maxage=3d && maxversions=100)
func parseGCPolicy(s string) (bigtable.GCPolicy, error) {
if strings.TrimSpace(s) == "never" {
return bigtable.NoGcPolicy(), nil
}
r := strings.NewReader(s)
p, err := parsePolicyExpr(r)
if err != nil {
return nil, fmt.Errorf("invalid GC policy: %v", err)
}
tok, err := getToken(r)
if err != nil {
return nil, err
}
if tok != "" {
return nil, fmt.Errorf("invalid GC policy: want end of input, got %q", tok)
}
return p, nil
}
// expr ::= term (op term)*
// op ::= "and" | "or" | "&&" | "||"
func parsePolicyExpr(r io.RuneScanner) (bigtable.GCPolicy, error) {
policy, err := parsePolicyTerm(r)
if err != nil {
return nil, err
}
for {
tok, err := getToken(r)
if err != nil {
return nil, err
}
var f func(...bigtable.GCPolicy) bigtable.GCPolicy
switch tok {
case "and", "&&":
f = bigtable.IntersectionPolicy
case "or", "||":
f = bigtable.UnionPolicy
default:
ungetToken(tok)
return policy, nil
}
p2, err := parsePolicyTerm(r)
if err != nil {
return nil, err
}
policy = f(policy, p2)
}
}
// term ::= "maxage" "=" duration | "maxversions" "=" int | "(" policy ")"
func parsePolicyTerm(r io.RuneScanner) (bigtable.GCPolicy, error) {
tok, err := getToken(r)
if err != nil {
return nil, err
}
switch tok {
case "":
return nil, errors.New("empty GC policy term")
case "maxage", "maxversions":
if err := expectToken(r, "="); err != nil {
return nil, err
}
tok2, err := getToken(r)
if err != nil {
return nil, err
}
if tok2 == "" {
return nil, errors.New("expected a token after '='")
}
if tok == "maxage" {
dur, err := parseDuration(tok2)
if err != nil {
return nil, err
}
return bigtable.MaxAgePolicy(dur), nil
}
n, err := strconv.ParseUint(tok2, 10, 16)
if err != nil {
return nil, err
}
return bigtable.MaxVersionsPolicy(int(n)), nil
case "(":
p, err := parsePolicyExpr(r)
if err != nil {
return nil, err
}
if err := expectToken(r, ")"); err != nil {
return nil, err
}
return p, nil
default:
return nil, fmt.Errorf("unexpected token: %q", tok)
}
}
func expectToken(r io.RuneScanner, want string) error {
got, err := getToken(r)
if err != nil {
return err
}
if got != want {
return fmt.Errorf("expected %q, saw %q", want, got)
}
return nil
}
const noToken = "_" // empty token is valid, so use "_" instead
// If not noToken, getToken will return this instead of reading a new token
// from the input.
var ungotToken = noToken
// getToken extracts the first token from the input. Valid tokens include
// any sequence of letters and digits, and these symbols: &&, ||, =, ( and ).
// getToken returns ("", nil) at end of input.
func getToken(r io.RuneScanner) (string, error) {
if ungotToken != noToken {
t := ungotToken
ungotToken = noToken
return t, nil
}
var err error
// Skip leading whitespace.
c := ' '
for unicode.IsSpace(c) {
c, _, err = r.ReadRune()
if err == io.EOF {
return "", nil
}
if err != nil {
return "", err
}
}
switch {
case c == '=' || c == '(' || c == ')':
return string(c), nil
case c == '&' || c == '|':
c2, _, err := r.ReadRune()
if err != nil && err != io.EOF {
return "", err
}
if c != c2 {
return "", fmt.Errorf("expected %c%c", c, c)
}
return string([]rune{c, c}), nil
case unicode.IsLetter(c) || unicode.IsDigit(c):
// Collect an alphanumeric token.
var b bytes.Buffer
for unicode.IsLetter(c) || unicode.IsDigit(c) {
b.WriteRune(c)
c, _, err = r.ReadRune()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
}
r.UnreadRune()
return b.String(), nil
default:
return "", fmt.Errorf("bad rune %q", c)
}
}
// "unget" a token so the next call to getToken will return it.
func ungetToken(tok string) {
if ungotToken != noToken {
panic("ungetToken called twice")
}
ungotToken = tok
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"strings"
"testing"
"time"
"cloud.google.com/go/bigtable"
"github.com/google/go-cmp/cmp"
)
func TestParseGCPolicy(t *testing.T) {
for _, test := range []struct {
in string
want bigtable.GCPolicy
}{
{
"never",
bigtable.NoGcPolicy(),
},
{
"maxage=3h",
bigtable.MaxAgePolicy(3 * time.Hour),
},
{
"maxversions=2",
bigtable.MaxVersionsPolicy(2),
},
{
"maxversions=2 and maxage=1h",
bigtable.IntersectionPolicy(bigtable.MaxVersionsPolicy(2), bigtable.MaxAgePolicy(time.Hour)),
},
{
"(((maxversions=2 and (maxage=1h))))",
bigtable.IntersectionPolicy(bigtable.MaxVersionsPolicy(2), bigtable.MaxAgePolicy(time.Hour)),
},
{
"maxversions=7 or maxage=8h",
bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(7), bigtable.MaxAgePolicy(8*time.Hour)),
},
{
"maxversions = 7||maxage = 8h",
bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(7), bigtable.MaxAgePolicy(8*time.Hour)),
},
{
"maxversions=7||maxage=8h",
bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(7), bigtable.MaxAgePolicy(8*time.Hour)),
},
{
"maxage=30d || (maxage=3d && maxversions=100)",
bigtable.UnionPolicy(
bigtable.MaxAgePolicy(30*24*time.Hour),
bigtable.IntersectionPolicy(
bigtable.MaxAgePolicy(3*24*time.Hour),
bigtable.MaxVersionsPolicy(100))),
},
{
"maxage=30d || (maxage=3d && maxversions=100) || maxversions=7",
bigtable.UnionPolicy(
bigtable.UnionPolicy(
bigtable.MaxAgePolicy(30*24*time.Hour),
bigtable.IntersectionPolicy(
bigtable.MaxAgePolicy(3*24*time.Hour),
bigtable.MaxVersionsPolicy(100))),
bigtable.MaxVersionsPolicy(7)),
},
{
// && and || have same precedence, left associativity
"maxage=1h && maxage=2h || maxage=3h",
bigtable.UnionPolicy(
bigtable.IntersectionPolicy(
bigtable.MaxAgePolicy(1*time.Hour),
bigtable.MaxAgePolicy(2*time.Hour)),
bigtable.MaxAgePolicy(3*time.Hour)),
},
} {
got, err := parseGCPolicy(test.in)
if err != nil {
t.Errorf("%s: %v", test.in, err)
continue
}
if !cmp.Equal(got, test.want, cmp.AllowUnexported(bigtable.IntersectionPolicy(), bigtable.UnionPolicy())) {
t.Errorf("%s: got %+v, want %+v", test.in, got, test.want)
}
}
}
func TestParseGCPolicyErrors(t *testing.T) {
for _, in := range []string{
"",
"a",
"b = 1h",
"c = 1",
"maxage=1", // need duration
"maxversions=1h", // need int
"maxage",
"maxversions",
"never=never",
"maxversions=1 && never",
"(((maxage=1h))",
"((maxage=1h)))",
"maxage=30d || ((maxage=3d && maxversions=100)",
"maxversions = 3 and",
} {
_, err := parseGCPolicy(in)
if err == nil {
t.Errorf("%s: got nil, want error", in)
}
}
}
func TestTokenizeGCPolicy(t *testing.T) {
for _, test := range []struct {
in string
want []string
}{
{
"maxage=5d",
[]string{"maxage", "=", "5d"},
},
{
"maxage = 5d",
[]string{"maxage", "=", "5d"},
},
{
"maxage=5d or maxversions=5",
[]string{"maxage", "=", "5d", "or", "maxversions", "=", "5"},
},
{
"maxage=5d || (maxversions=5)",
[]string{"maxage", "=", "5d", "||", "(", "maxversions", "=", "5", ")"},
},
{
"maxage=5d||( maxversions=5 )",
[]string{"maxage", "=", "5d", "||", "(", "maxversions", "=", "5", ")"},
},
} {
got, err := tokenizeGCPolicy(test.in)
if err != nil {
t.Errorf("%s: %v", test.in, err)
continue
}
if diff := cmp.Diff(got, test.want); diff != "" {
t.Errorf("%s: %s", test.in, diff)
}
}
}
func TestTokenizeGCPolicyErrors(t *testing.T) {
for _, in := range []string{
"a &",
"a & b",
"a &x b",
"a |",
"a | b",
"a |& b",
"a % b",
} {
_, err := tokenizeGCPolicy(in)
if err == nil {
t.Errorf("%s: got nil, want error", in)
}
}
}
func tokenizeGCPolicy(s string) ([]string, error) {
var tokens []string
r := strings.NewReader(s)
for {
tok, err := getToken(r)
if err != nil {
return nil, err
}
if tok == "" {
break
}
tokens = append(tokens, tok)
}
return tokens, nil
}
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
cbtemulator launches the in-memory Cloud Bigtable server on the given address.
*/
package main
import (
"flag"
"fmt"
"log"
"cloud.google.com/go/bigtable/bttest"
"google.golang.org/grpc"
)
var (
host = flag.String("host", "localhost", "the address to bind to on the local machine")
port = flag.Int("port", 9000, "the port number to bind to on the local machine")
)
func main() {
grpc.EnableTracing = false
flag.Parse()
srv, err := bttest.NewServer(fmt.Sprintf("%s:%d", *host, *port))
if err != nil {
log.Fatalf("failed to start emulator: %v", err)
}
fmt.Printf("Cloud Bigtable emulator running on %s\n", srv.Addr)
select {}
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Loadtest does some load testing through the Go client library for Cloud Bigtable.
*/
package main
import (
"bytes"
"context"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/internal/cbtconfig"
"cloud.google.com/go/bigtable/internal/stat"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
var (
runFor = flag.Duration("run_for", 5*time.Second,
"how long to run the load test for; 0 to run forever until SIGTERM")
scratchTable = flag.String("scratch_table", "loadtest-scratch", "name of table to use; should not already exist")
csvOutput = flag.String("csv_output", "",
"output path for statistics in .csv format. If this file already exists it will be overwritten.")
poolSize = flag.Int("pool_size", 1, "size of the gRPC connection pool to use for the data client")
reqCount = flag.Int("req_count", 100, "number of concurrent requests")
config *cbtconfig.Config
client *bigtable.Client
adminClient *bigtable.AdminClient
)
func main() {
var err error
config, err = cbtconfig.Load()
if err != nil {
log.Fatal(err)
}
config.RegisterFlags()
flag.Parse()
if err := config.CheckFlags(cbtconfig.ProjectAndInstanceRequired); err != nil {
log.Fatal(err)
}
if config.Creds != "" {
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", config.Creds)
}
if flag.NArg() != 0 {
flag.Usage()
os.Exit(1)
}
var options []option.ClientOption
if *poolSize > 1 {
options = append(options,
option.WithGRPCConnectionPool(*poolSize),
// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
option.WithGRPCDialOption(grpc.WithBlock()))
}
var csvFile *os.File
if *csvOutput != "" {
csvFile, err = os.Create(*csvOutput)
if err != nil {
log.Fatalf("creating csv output file: %v", err)
}
defer csvFile.Close()
log.Printf("Writing statistics to %q ...", *csvOutput)
}
log.Printf("Dialing connections...")
client, err = bigtable.NewClient(context.Background(), config.Project, config.Instance, options...)
if err != nil {
log.Fatalf("Making bigtable.Client: %v", err)
}
defer client.Close()
adminClient, err = bigtable.NewAdminClient(context.Background(), config.Project, config.Instance)
if err != nil {
log.Fatalf("Making bigtable.AdminClient: %v", err)
}
defer adminClient.Close()
// Create a scratch table.
log.Printf("Setting up scratch table...")
tblConf := bigtable.TableConf{
TableID: *scratchTable,
Families: map[string]bigtable.GCPolicy{"f": bigtable.MaxVersionsPolicy(1)},
}
if err := adminClient.CreateTableFromConf(context.Background(), &tblConf); err != nil {
log.Fatalf("Making scratch table %q: %v", *scratchTable, err)
}
// Upon a successful run, delete the table. Don't bother checking for errors.
defer adminClient.DeleteTable(context.Background(), *scratchTable)
// Also delete the table on SIGTERM.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
s := <-c
log.Printf("Caught %v, cleaning scratch table.", s)
_ = adminClient.DeleteTable(context.Background(), *scratchTable)
os.Exit(1)
}()
log.Printf("Starting load test... (run for %v)", *runFor)
tbl := client.Open(*scratchTable)
sem := make(chan int, *reqCount) // limit the number of requests happening at once
var reads, writes stats
stopTime := time.Now().Add(*runFor)
var wg sync.WaitGroup
for time.Now().Before(stopTime) || *runFor == 0 {
sem <- 1
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }()
ok := true
opStart := time.Now()
var stats *stats
defer func() {
stats.Record(ok, time.Since(opStart))
}()
row := fmt.Sprintf("row%d", rand.Intn(100)) // operate on 1 of 100 rows
switch rand.Intn(10) {
default:
// read
stats = &reads
_, err := tbl.ReadRow(context.Background(), row, bigtable.RowFilter(bigtable.LatestNFilter(1)))
if err != nil {
log.Printf("Error doing read: %v", err)
ok = false
}
case 0, 1, 2, 3, 4:
// write
stats = &writes
mut := bigtable.NewMutation()
mut.Set("f", "col", bigtable.Now(), bytes.Repeat([]byte("0"), 1<<10)) // 1 KB write
if err := tbl.Apply(context.Background(), row, mut); err != nil {
log.Printf("Error doing mutation: %v", err)
ok = false
}
}
}()
}
wg.Wait()
readsAgg := stat.NewAggregate("reads", reads.ds, reads.tries-reads.ok)
writesAgg := stat.NewAggregate("writes", writes.ds, writes.tries-writes.ok)
log.Printf("Reads (%d ok / %d tries):\n%v", reads.ok, reads.tries, readsAgg)
log.Printf("Writes (%d ok / %d tries):\n%v", writes.ok, writes.tries, writesAgg)
if csvFile != nil {
stat.WriteCSV([]*stat.Aggregate{readsAgg, writesAgg}, csvFile)
}
}
var allStats int64 // atomic
type stats struct {
mu sync.Mutex
tries, ok int
ds []time.Duration
}
func (s *stats) Record(ok bool, d time.Duration) {
s.mu.Lock()
s.tries++
if ok {
s.ok++
}
s.ds = append(s.ds, d)
s.mu.Unlock()
if n := atomic.AddInt64(&allStats, 1); n%1000 == 0 {
log.Printf("Progress: done %d ops", n)
}
}
/*
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Scantest does scan-related load testing against Cloud Bigtable. The logic here
mimics a similar test written using the Java client.
*/
package main
import (
"bytes"
"context"
"flag"
"fmt"
"log"
"math/rand"
"os"
"sync"
"sync/atomic"
"text/tabwriter"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/internal/cbtconfig"
"cloud.google.com/go/bigtable/internal/stat"
)
var (
runFor = flag.Duration("run_for", 5*time.Second, "how long to run the load test for")
numScans = flag.Int("concurrent_scans", 1, "number of concurrent scans")
rowLimit = flag.Int("row_limit", 10000, "max number of records per scan")
config *cbtconfig.Config
client *bigtable.Client
)
func main() {
flag.Usage = func() {
fmt.Printf("Usage: scantest [options] <table_name>\n\n")
flag.PrintDefaults()
}
var err error
config, err = cbtconfig.Load()
if err != nil {
log.Fatal(err)
}
config.RegisterFlags()
flag.Parse()
if err := config.CheckFlags(cbtconfig.ProjectAndInstanceRequired); err != nil {
log.Fatal(err)
}
if config.Creds != "" {
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", config.Creds)
}
if flag.NArg() != 1 {
flag.Usage()
os.Exit(1)
}
table := flag.Arg(0)
log.Printf("Dialing connections...")
client, err = bigtable.NewClient(context.Background(), config.Project, config.Instance)
if err != nil {
log.Fatalf("Making bigtable.Client: %v", err)
}
defer client.Close()
log.Printf("Starting scan test... (run for %v)", *runFor)
tbl := client.Open(table)
sem := make(chan int, *numScans) // limit the number of requests happening at once
var scans stats
stopTime := time.Now().Add(*runFor)
var wg sync.WaitGroup
for time.Now().Before(stopTime) {
sem <- 1
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }()
ok := true
opStart := time.Now()
defer func() {
scans.Record(ok, time.Since(opStart))
}()
// Start at a random row key
key := fmt.Sprintf("user%d", rand.Int63())
limit := bigtable.LimitRows(int64(*rowLimit))
noop := func(bigtable.Row) bool { return true }
if err := tbl.ReadRows(context.Background(), bigtable.NewRange(key, ""), noop, limit); err != nil {
log.Printf("Error during scan: %v", err)
ok = false
}
}()
}
wg.Wait()
agg := stat.NewAggregate("scans", scans.ds, scans.tries-scans.ok)
log.Printf("Scans (%d ok / %d tries):\nscan times:\n%v\nthroughput (rows/second):\n%v",
scans.ok, scans.tries, agg, throughputString(agg))
}
func throughputString(agg *stat.Aggregate) string {
var buf bytes.Buffer
tw := tabwriter.NewWriter(&buf, 0, 0, 1, ' ', 0) // one-space padding
rowLimitF := float64(*rowLimit)
fmt.Fprintf(
tw,
"min:\t%.2f\nmedian:\t%.2f\nmax:\t%.2f\n",
rowLimitF/agg.Max.Seconds(),
rowLimitF/agg.Median.Seconds(),
rowLimitF/agg.Min.Seconds())
tw.Flush()
return buf.String()
}
var allStats int64 // atomic
type stats struct {
mu sync.Mutex
tries, ok int
ds []time.Duration
}
func (s *stats) Record(ok bool, d time.Duration) {
s.mu.Lock()
s.tries++
if ok {
s.ok++
}
s.ds = append(s.ds, d)
s.mu.Unlock()
if n := atomic.AddInt64(&allStats, 1); n%1000 == 0 {
log.Printf("Progress: done %d ops", n)
}
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package bigtable is an API to Google Cloud Bigtable.
See https://cloud.google.com/bigtable/docs/ for general product documentation.
See https://godoc.org/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Setup and Credentials
Use NewClient or NewAdminClient to create a client that can be used to access
the data or admin APIs respectively. Both require credentials that have permission
to access the Cloud Bigtable API.
If your program is run on Google App Engine or Google Compute Engine, using the Application Default Credentials
(https://developers.google.com/accounts/docs/application-default-credentials)
is the simplest option. Those credentials will be used by default when NewClient or NewAdminClient are called.
To use alternate credentials, pass them to NewClient or NewAdminClient using option.WithTokenSource.
For instance, you can use service account credentials by visiting
https://cloud.google.com/console/project/MYPROJECT/apiui/credential,
creating a new OAuth "Client ID", storing the JSON key somewhere accessible, and writing
jsonKey, err := ioutil.ReadFile(pathToKeyFile)
...
config, err := google.JWTConfigFromJSON(jsonKey, bigtable.Scope) // or bigtable.AdminScope, etc.
...
client, err := bigtable.NewClient(ctx, project, instance, option.WithTokenSource(config.TokenSource(ctx)))
...
Here, `google` means the golang.org/x/oauth2/google package
and `option` means the google.golang.org/api/option package.
Reading
The principal way to read from a Bigtable is to use the ReadRows method on *Table.
A RowRange specifies a contiguous portion of a table. A Filter may be provided through
RowFilter to limit or transform the data that is returned.
tbl := client.Open("mytable")
...
// Read all the rows starting with "com.google.",
// but only fetch the columns in the "links" family.
rr := bigtable.PrefixRange("com.google.")
err := tbl.ReadRows(ctx, rr, func(r Row) bool {
// do something with r
return true // keep going
}, bigtable.RowFilter(bigtable.FamilyFilter("links")))
...
To read a single row, use the ReadRow helper method.
r, err := tbl.ReadRow(ctx, "com.google.cloud") // "com.google.cloud" is the entire row key
...
Writing
This API exposes two distinct forms of writing to a Bigtable: a Mutation and a ReadModifyWrite.
The former expresses idempotent operations.
The latter expresses non-idempotent operations and returns the new values of updated cells.
These operations are performed by creating a Mutation or ReadModifyWrite (with NewMutation or NewReadModifyWrite),
building up one or more operations on that, and then using the Apply or ApplyReadModifyWrite
methods on a Table.
For instance, to set a couple of cells in a table,
tbl := client.Open("mytable")
mut := bigtable.NewMutation()
mut.Set("links", "maps.google.com", bigtable.Now(), []byte("1"))
mut.Set("links", "golang.org", bigtable.Now(), []byte("1"))
err := tbl.Apply(ctx, "com.google.cloud", mut)
...
To increment an encoded value in one cell,
tbl := client.Open("mytable")
rmw := bigtable.NewReadModifyWrite()
rmw.Increment("links", "golang.org", 12) // add 12 to the cell in column "links:golang.org"
r, err := tbl.ApplyReadModifyWrite(ctx, "com.google.cloud", rmw)
...
Retries
If a read or write operation encounters a transient error it will be retried until a successful
response, an unretryable error or the context deadline is reached. Non-idempotent writes (where
the timestamp is set to ServerTime) will not be retried. In the case of ReadRows, retried calls
will not re-scan rows that have already been processed.
*/
package bigtable // import "cloud.google.com/go/bigtable"
// Scope constants for authentication credentials.
// These should be used when using credential creation functions such as oauth.NewServiceAccountFromFile.
const (
// Scope is the OAuth scope for Cloud Bigtable data operations.
Scope = "https://www.googleapis.com/auth/bigtable.data"
// ReadonlyScope is the OAuth scope for Cloud Bigtable read-only data operations.
ReadonlyScope = "https://www.googleapis.com/auth/bigtable.readonly"
// AdminScope is the OAuth scope for Cloud Bigtable table admin operations.
AdminScope = "https://www.googleapis.com/auth/bigtable.admin.table"
// InstanceAdminScope is the OAuth scope for Cloud Bigtable instance (and cluster) admin operations.
InstanceAdminScope = "https://www.googleapis.com/auth/bigtable.admin.cluster"
)
// clientUserAgent identifies the version of this package.
// It should be bumped upon significant changes only.
const clientUserAgent = "cbt-go/20180601"
// resourcePrefixHeader is the name of the metadata header used to indicate
// the resource being operated on.
const resourcePrefixHeader = "google-cloud-resource-prefix"
/*
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"context"
"errors"
"flag"
"fmt"
"strings"
"time"
"cloud.google.com/go/bigtable/bttest"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
var legacyUseProd string
var integrationConfig IntegrationTestConfig
func init() {
c := &integrationConfig
flag.BoolVar(&c.UseProd, "it.use-prod", false, "Use remote bigtable instead of local emulator")
flag.StringVar(&c.AdminEndpoint, "it.admin-endpoint", "", "Admin api host and port")
flag.StringVar(&c.DataEndpoint, "it.data-endpoint", "", "Data api host and port")
flag.StringVar(&c.Project, "it.project", "", "Project to use for integration test")
flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use")
flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use")
flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create")
// Backwards compat
flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`)
}
// IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing
type IntegrationTestConfig struct {
UseProd bool
AdminEndpoint string
DataEndpoint string
Project string
Instance string
Cluster string
Table string
}
// IntegrationEnv represents a testing environment.
// The environment can be implemented using production or an emulator
type IntegrationEnv interface {
Config() IntegrationTestConfig
NewAdminClient() (*AdminClient, error)
// NewInstanceAdminClient will return nil if instance administration is unsupported in this environment
NewInstanceAdminClient() (*InstanceAdminClient, error)
NewClient() (*Client, error)
Close()
}
// NewIntegrationEnv creates a new environment based on the command line args
func NewIntegrationEnv() (IntegrationEnv, error) {
c := integrationConfig
if legacyUseProd != "" {
fmt.Println("WARNING: using legacy commandline arg -use_prod, please switch to -it.*")
parts := strings.SplitN(legacyUseProd, ",", 3)
c.UseProd = true
c.Project = parts[0]
c.Instance = parts[1]
c.Table = parts[2]
}
if integrationConfig.UseProd {
return NewProdEnv(c)
}
return NewEmulatedEnv(c)
}
// EmulatedEnv encapsulates the state of an emulator
type EmulatedEnv struct {
config IntegrationTestConfig
server *bttest.Server
}
// NewEmulatedEnv builds and starts the emulator based environment
func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) {
srv, err := bttest.NewServer("localhost:0", grpc.MaxRecvMsgSize(200<<20), grpc.MaxSendMsgSize(100<<20))
if err != nil {
return nil, err
}
if config.Project == "" {
config.Project = "project"
}
if config.Instance == "" {
config.Instance = "instance"
}
if config.Table == "" {
config.Table = "mytable"
}
config.AdminEndpoint = srv.Addr
config.DataEndpoint = srv.Addr
env := &EmulatedEnv{
config: config,
server: srv,
}
return env, nil
}
// Close stops & cleans up the emulator
func (e *EmulatedEnv) Close() {
e.server.Close()
}
// Config gets the config used to build this environment
func (e *EmulatedEnv) Config() IntegrationTestConfig {
return e.config
}
// NewAdminClient builds a new connected admin client for this environment
func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) {
timeout := 20 * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
conn, err := grpc.Dial(e.server.Addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, err
}
return NewAdminClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn))
}
// NewInstanceAdminClient returns nil for the emulated environment since the API is not implemented.
func (e *EmulatedEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
return nil, nil
}
// NewClient builds a new connected data client for this environment
func (e *EmulatedEnv) NewClient() (*Client, error) {
timeout := 20 * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
conn, err := grpc.Dial(e.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)))
if err != nil {
return nil, err
}
return NewClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn))
}
// ProdEnv encapsulates the state necessary to connect to the external Bigtable service
type ProdEnv struct {
config IntegrationTestConfig
}
// NewProdEnv builds the environment representation
func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) {
if config.Project == "" {
return nil, errors.New("Project not set")
}
if config.Instance == "" {
return nil, errors.New("Instance not set")
}
if config.Table == "" {
return nil, errors.New("Table not set")
}
return &ProdEnv{config}, nil
}
// Close is a no-op for production environments
func (e *ProdEnv) Close() {}
// Config gets the config used to build this environment
func (e *ProdEnv) Config() IntegrationTestConfig {
return e.config
}
// NewAdminClient builds a new connected admin client for this environment
func (e *ProdEnv) NewAdminClient() (*AdminClient, error) {
var clientOpts []option.ClientOption
if endpoint := e.config.AdminEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewAdminClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
}
// NewInstanceAdminClient returns a new connected instance admin client for this environment
func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
var clientOpts []option.ClientOption
if endpoint := e.config.AdminEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewInstanceAdminClient(context.Background(), e.config.Project, clientOpts...)
}
// NewClient builds a connected data client for this environment
func (e *ProdEnv) NewClient() (*Client, error) {
var clientOpts []option.ClientOption
if endpoint := e.config.DataEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"fmt"
"strings"
"time"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
)
// A Filter represents a row filter.
type Filter interface {
String() string
proto() *btpb.RowFilter
}
// ChainFilters returns a filter that applies a sequence of filters.
func ChainFilters(sub ...Filter) Filter { return chainFilter{sub} }
type chainFilter struct {
sub []Filter
}
func (cf chainFilter) String() string {
var ss []string
for _, sf := range cf.sub {
ss = append(ss, sf.String())
}
return "(" + strings.Join(ss, " | ") + ")"
}
func (cf chainFilter) proto() *btpb.RowFilter {
chain := &btpb.RowFilter_Chain{}
for _, sf := range cf.sub {
chain.Filters = append(chain.Filters, sf.proto())
}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_Chain_{Chain: chain},
}
}
// InterleaveFilters returns a filter that applies a set of filters in parallel
// and interleaves the results.
func InterleaveFilters(sub ...Filter) Filter { return interleaveFilter{sub} }
type interleaveFilter struct {
sub []Filter
}
func (ilf interleaveFilter) String() string {
var ss []string
for _, sf := range ilf.sub {
ss = append(ss, sf.String())
}
return "(" + strings.Join(ss, " + ") + ")"
}
func (ilf interleaveFilter) proto() *btpb.RowFilter {
inter := &btpb.RowFilter_Interleave{}
for _, sf := range ilf.sub {
inter.Filters = append(inter.Filters, sf.proto())
}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_Interleave_{Interleave: inter},
}
}
// RowKeyFilter returns a filter that matches cells from rows whose
// key matches the provided RE2 pattern.
// See https://github.com/google/re2/wiki/Syntax for the accepted syntax.
func RowKeyFilter(pattern string) Filter { return rowKeyFilter(pattern) }
type rowKeyFilter string
func (rkf rowKeyFilter) String() string { return fmt.Sprintf("row(%s)", string(rkf)) }
func (rkf rowKeyFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{RowKeyRegexFilter: []byte(rkf)}}
}
// FamilyFilter returns a filter that matches cells whose family name
// matches the provided RE2 pattern.
// See https://github.com/google/re2/wiki/Syntax for the accepted syntax.
func FamilyFilter(pattern string) Filter { return familyFilter(pattern) }
type familyFilter string
func (ff familyFilter) String() string { return fmt.Sprintf("col(%s:)", string(ff)) }
func (ff familyFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{FamilyNameRegexFilter: string(ff)}}
}
// ColumnFilter returns a filter that matches cells whose column name
// matches the provided RE2 pattern.
// See https://github.com/google/re2/wiki/Syntax for the accepted syntax.
func ColumnFilter(pattern string) Filter { return columnFilter(pattern) }
type columnFilter string
func (cf columnFilter) String() string { return fmt.Sprintf("col(.*:%s)", string(cf)) }
func (cf columnFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{ColumnQualifierRegexFilter: []byte(cf)}}
}
// ValueFilter returns a filter that matches cells whose value
// matches the provided RE2 pattern.
// See https://github.com/google/re2/wiki/Syntax for the accepted syntax.
func ValueFilter(pattern string) Filter { return valueFilter(pattern) }
type valueFilter string
func (vf valueFilter) String() string { return fmt.Sprintf("value_match(%s)", string(vf)) }
func (vf valueFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{ValueRegexFilter: []byte(vf)}}
}
// LatestNFilter returns a filter that matches the most recent N cells in each column.
func LatestNFilter(n int) Filter { return latestNFilter(n) }
type latestNFilter int32
func (lnf latestNFilter) String() string { return fmt.Sprintf("col(*,%d)", lnf) }
func (lnf latestNFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerColumnLimitFilter{CellsPerColumnLimitFilter: int32(lnf)}}
}
// StripValueFilter returns a filter that replaces each value with the empty string.
func StripValueFilter() Filter { return stripValueFilter{} }
type stripValueFilter struct{}
func (stripValueFilter) String() string { return "strip_value()" }
func (stripValueFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_StripValueTransformer{StripValueTransformer: true}}
}
// TimestampRangeFilter returns a filter that matches any cells whose timestamp is within the given time bounds. A zero
// time means no bound.
// The timestamp will be truncated to millisecond granularity.
func TimestampRangeFilter(startTime time.Time, endTime time.Time) Filter {
trf := timestampRangeFilter{}
if !startTime.IsZero() {
trf.startTime = Time(startTime)
}
if !endTime.IsZero() {
trf.endTime = Time(endTime)
}
return trf
}
// TimestampRangeFilterMicros returns a filter that matches any cells whose timestamp is within the given time bounds,
// specified in units of microseconds since 1 January 1970. A zero value for the end time is interpreted as no bound.
// The timestamp will be truncated to millisecond granularity.
func TimestampRangeFilterMicros(startTime Timestamp, endTime Timestamp) Filter {
return timestampRangeFilter{startTime, endTime}
}
type timestampRangeFilter struct {
startTime Timestamp
endTime Timestamp
}
func (trf timestampRangeFilter) String() string {
return fmt.Sprintf("timestamp_range(%v,%v)", trf.startTime, trf.endTime)
}
func (trf timestampRangeFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{
Filter: &btpb.RowFilter_TimestampRangeFilter{TimestampRangeFilter: &btpb.TimestampRange{
StartTimestampMicros: int64(trf.startTime.TruncateToMilliseconds()),
EndTimestampMicros: int64(trf.endTime.TruncateToMilliseconds()),
},
}}
}
// ColumnRangeFilter returns a filter that matches a contiguous range of columns within a single
// family, as specified by an inclusive start qualifier and exclusive end qualifier.
func ColumnRangeFilter(family, start, end string) Filter {
return columnRangeFilter{family, start, end}
}
type columnRangeFilter struct {
family string
start string
end string
}
func (crf columnRangeFilter) String() string {
return fmt.Sprintf("columnRangeFilter(%s,%s,%s)", crf.family, crf.start, crf.end)
}
func (crf columnRangeFilter) proto() *btpb.RowFilter {
r := &btpb.ColumnRange{FamilyName: crf.family}
if crf.start != "" {
r.StartQualifier = &btpb.ColumnRange_StartQualifierClosed{StartQualifierClosed: []byte(crf.start)}
}
if crf.end != "" {
r.EndQualifier = &btpb.ColumnRange_EndQualifierOpen{EndQualifierOpen: []byte(crf.end)}
}
return &btpb.RowFilter{Filter: &btpb.RowFilter_ColumnRangeFilter{ColumnRangeFilter: r}}
}
// ValueRangeFilter returns a filter that matches cells with values that fall within
// the given range, as specified by an inclusive start value and exclusive end value.
func ValueRangeFilter(start, end []byte) Filter {
return valueRangeFilter{start, end}
}
type valueRangeFilter struct {
start []byte
end []byte
}
func (vrf valueRangeFilter) String() string {
return fmt.Sprintf("valueRangeFilter(%s,%s)", vrf.start, vrf.end)
}
func (vrf valueRangeFilter) proto() *btpb.RowFilter {
r := &btpb.ValueRange{}
if vrf.start != nil {
r.StartValue = &btpb.ValueRange_StartValueClosed{StartValueClosed: vrf.start}
}
if vrf.end != nil {
r.EndValue = &btpb.ValueRange_EndValueOpen{EndValueOpen: vrf.end}
}
return &btpb.RowFilter{Filter: &btpb.RowFilter_ValueRangeFilter{ValueRangeFilter: r}}
}
// ConditionFilter returns a filter that evaluates to one of two possible filters depending
// on whether or not the given predicate filter matches at least one cell.
// If the matched filter is nil then no results will be returned.
// IMPORTANT NOTE: The predicate filter does not execute atomically with the
// true and false filters, which may lead to inconsistent or unexpected
// results. Additionally, condition filters have poor performance, especially
// when filters are set for the false condition.
func ConditionFilter(predicateFilter, trueFilter, falseFilter Filter) Filter {
return conditionFilter{predicateFilter, trueFilter, falseFilter}
}
type conditionFilter struct {
predicateFilter Filter
trueFilter Filter
falseFilter Filter
}
func (cf conditionFilter) String() string {
return fmt.Sprintf("conditionFilter(%s,%s,%s)", cf.predicateFilter, cf.trueFilter, cf.falseFilter)
}
func (cf conditionFilter) proto() *btpb.RowFilter {
var tf *btpb.RowFilter
var ff *btpb.RowFilter
if cf.trueFilter != nil {
tf = cf.trueFilter.proto()
}
if cf.falseFilter != nil {
ff = cf.falseFilter.proto()
}
return &btpb.RowFilter{
Filter: &btpb.RowFilter_Condition_{Condition: &btpb.RowFilter_Condition{
PredicateFilter: cf.predicateFilter.proto(),
TrueFilter: tf,
FalseFilter: ff,
}}}
}
// CellsPerRowOffsetFilter returns a filter that skips the first N cells of each row, matching all subsequent cells.
func CellsPerRowOffsetFilter(n int) Filter {
return cellsPerRowOffsetFilter(n)
}
type cellsPerRowOffsetFilter int32
func (cof cellsPerRowOffsetFilter) String() string {
return fmt.Sprintf("cells_per_row_offset(%d)", cof)
}
func (cof cellsPerRowOffsetFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerRowOffsetFilter{CellsPerRowOffsetFilter: int32(cof)}}
}
// CellsPerRowLimitFilter returns a filter that matches only the first N cells of each row.
func CellsPerRowLimitFilter(n int) Filter {
return cellsPerRowLimitFilter(n)
}
type cellsPerRowLimitFilter int32
func (clf cellsPerRowLimitFilter) String() string {
return fmt.Sprintf("cells_per_row_limit(%d)", clf)
}
func (clf cellsPerRowLimitFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_CellsPerRowLimitFilter{CellsPerRowLimitFilter: int32(clf)}}
}
// RowSampleFilter returns a filter that matches a row with a probability of p (must be in the interval (0, 1)).
func RowSampleFilter(p float64) Filter {
return rowSampleFilter(p)
}
type rowSampleFilter float64
func (rsf rowSampleFilter) String() string {
return fmt.Sprintf("filter(%f)", rsf)
}
func (rsf rowSampleFilter) proto() *btpb.RowFilter {
return &btpb.RowFilter{Filter: &btpb.RowFilter_RowSampleFilter{RowSampleFilter: float64(rsf)}}
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"fmt"
"strings"
"time"
durpb "github.com/golang/protobuf/ptypes/duration"
bttdpb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
)
// A GCPolicy represents a rule that determines which cells are eligible for garbage collection.
type GCPolicy interface {
String() string
proto() *bttdpb.GcRule
}
// IntersectionPolicy returns a GC policy that only applies when all its sub-policies apply.
func IntersectionPolicy(sub ...GCPolicy) GCPolicy { return intersectionPolicy{sub} }
type intersectionPolicy struct {
sub []GCPolicy
}
func (ip intersectionPolicy) String() string {
var ss []string
for _, sp := range ip.sub {
ss = append(ss, sp.String())
}
return "(" + strings.Join(ss, " && ") + ")"
}
func (ip intersectionPolicy) proto() *bttdpb.GcRule {
inter := &bttdpb.GcRule_Intersection{}
for _, sp := range ip.sub {
inter.Rules = append(inter.Rules, sp.proto())
}
return &bttdpb.GcRule{
Rule: &bttdpb.GcRule_Intersection_{Intersection: inter},
}
}
// UnionPolicy returns a GC policy that applies when any of its sub-policies apply.
func UnionPolicy(sub ...GCPolicy) GCPolicy { return unionPolicy{sub} }
type unionPolicy struct {
sub []GCPolicy
}
func (up unionPolicy) String() string {
var ss []string
for _, sp := range up.sub {
ss = append(ss, sp.String())
}
return "(" + strings.Join(ss, " || ") + ")"
}
func (up unionPolicy) proto() *bttdpb.GcRule {
union := &bttdpb.GcRule_Union{}
for _, sp := range up.sub {
union.Rules = append(union.Rules, sp.proto())
}
return &bttdpb.GcRule{
Rule: &bttdpb.GcRule_Union_{Union: union},
}
}
// MaxVersionsPolicy returns a GC policy that applies to all versions of a cell
// except for the most recent n.
func MaxVersionsPolicy(n int) GCPolicy { return maxVersionsPolicy(n) }
type maxVersionsPolicy int
func (mvp maxVersionsPolicy) String() string { return fmt.Sprintf("versions() > %d", int(mvp)) }
func (mvp maxVersionsPolicy) proto() *bttdpb.GcRule {
return &bttdpb.GcRule{Rule: &bttdpb.GcRule_MaxNumVersions{MaxNumVersions: int32(mvp)}}
}
// MaxAgePolicy returns a GC policy that applies to all cells
// older than the given age.
func MaxAgePolicy(d time.Duration) GCPolicy { return maxAgePolicy(d) }
type maxAgePolicy time.Duration
var units = []struct {
d time.Duration
suffix string
}{
{24 * time.Hour, "d"},
{time.Hour, "h"},
{time.Minute, "m"},
}
func (ma maxAgePolicy) String() string {
d := time.Duration(ma)
for _, u := range units {
if d%u.d == 0 {
return fmt.Sprintf("age() > %d%s", d/u.d, u.suffix)
}
}
return fmt.Sprintf("age() > %d", d/time.Microsecond)
}
func (ma maxAgePolicy) proto() *bttdpb.GcRule {
// This doesn't handle overflows, etc.
// Fix this if people care about GC policies over 290 years.
ns := time.Duration(ma).Nanoseconds()
return &bttdpb.GcRule{
Rule: &bttdpb.GcRule_MaxAge{MaxAge: &durpb.Duration{
Seconds: ns / 1e9,
Nanos: int32(ns % 1e9),
}},
}
}
type noGCPolicy struct{}
func (n noGCPolicy) String() string { return "" }
func (n noGCPolicy) proto() *bttdpb.GcRule { return &bttdpb.GcRule{Rule: nil} }
// NoGcPolicy applies to all cells setting maxage and maxversions to nil implies no gc policies
func NoGcPolicy() GCPolicy { return noGCPolicy{} }
// GCRuleToString converts the given GcRule proto to a user-visible string.
func GCRuleToString(rule *bttdpb.GcRule) string {
if rule == nil {
return "<never>"
}
switch r := rule.Rule.(type) {
case *bttdpb.GcRule_MaxNumVersions:
return MaxVersionsPolicy(int(r.MaxNumVersions)).String()
case *bttdpb.GcRule_MaxAge:
return MaxAgePolicy(time.Duration(r.MaxAge.Seconds) * time.Second).String()
case *bttdpb.GcRule_Intersection_:
return joinRules(r.Intersection.Rules, " && ")
case *bttdpb.GcRule_Union_:
return joinRules(r.Union.Rules, " || ")
default:
return ""
}
}
func joinRules(rules []*bttdpb.GcRule, sep string) string {
var chunks []string
for _, r := range rules {
chunks = append(chunks, GCRuleToString(r))
}
return "(" + strings.Join(chunks, sep) + ")"
}
/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"testing"
"time"
bttdpb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
)
func TestGcRuleToString(t *testing.T) {
intersection := IntersectionPolicy(MaxVersionsPolicy(5), MaxVersionsPolicy(10), MaxAgePolicy(16*time.Hour))
var tests = []struct {
proto *bttdpb.GcRule
want string
}{
{MaxAgePolicy(72 * time.Hour).proto(), "age() > 3d"},
{MaxVersionsPolicy(5).proto(), "versions() > 5"},
{intersection.proto(), "(versions() > 5 && versions() > 10 && age() > 16h)"},
{UnionPolicy(intersection, MaxAgePolicy(72*time.Hour)).proto(),
"((versions() > 5 && versions() > 10 && age() > 16h) || age() > 3d)"},
}
for _, test := range tests {
got := GCRuleToString(test.proto)
if got != test.want {
t.Errorf("got gc rule string: %v, wanted: %v", got, test.want)
}
}
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cbtconfig encapsulates common code for reading configuration from .cbtrc and gcloud.
package cbtconfig
import (
"bufio"
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
"golang.org/x/oauth2"
"google.golang.org/grpc/credentials"
)
// Config represents a configuration.
type Config struct {
Project, Instance string // required
Creds string // optional
AdminEndpoint string // optional
DataEndpoint string // optional
CertFile string // optional
UserAgent string // optional
TokenSource oauth2.TokenSource // derived
TLSCreds credentials.TransportCredentials // derived
}
// RequiredFlags describes the flag requirements for a cbt command.
type RequiredFlags uint
const (
// NoneRequired specifies that not flags are required.
NoneRequired RequiredFlags = 0
// ProjectRequired specifies that the -project flag is required.
ProjectRequired RequiredFlags = 1 << iota
// InstanceRequired specifies that the -instance flag is required.
InstanceRequired
// ProjectAndInstanceRequired specifies that both -project and -instance is required.
ProjectAndInstanceRequired = ProjectRequired | InstanceRequired
)
// RegisterFlags registers a set of standard flags for this config.
// It should be called before flag.Parse.
func (c *Config) RegisterFlags() {
flag.StringVar(&c.Project, "project", c.Project, "project ID, if unset uses gcloud configured project")
flag.StringVar(&c.Instance, "instance", c.Instance, "Cloud Bigtable instance")
flag.StringVar(&c.Creds, "creds", c.Creds, "if set, use application credentials in this file")
flag.StringVar(&c.AdminEndpoint, "admin-endpoint", c.AdminEndpoint, "Override the admin api endpoint")
flag.StringVar(&c.DataEndpoint, "data-endpoint", c.DataEndpoint, "Override the data api endpoint")
flag.StringVar(&c.CertFile, "cert-file", c.CertFile, "Override the TLS certificates file")
flag.StringVar(&c.UserAgent, "user-agent", c.UserAgent, "Override the user agent string")
}
// CheckFlags checks that the required config values are set.
func (c *Config) CheckFlags(required RequiredFlags) error {
var missing []string
if c.CertFile != "" {
b, err := ioutil.ReadFile(c.CertFile)
if err != nil {
return fmt.Errorf("Failed to load certificates from %s: %v", c.CertFile, err)
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(b) {
return fmt.Errorf("Failed to append certificates from %s", c.CertFile)
}
c.TLSCreds = credentials.NewTLS(&tls.Config{RootCAs: cp})
}
if required != NoneRequired {
c.SetFromGcloud()
}
if required&ProjectRequired != 0 && c.Project == "" {
missing = append(missing, "-project")
}
if required&InstanceRequired != 0 && c.Instance == "" {
missing = append(missing, "-instance")
}
if len(missing) > 0 {
return fmt.Errorf("Missing %s", strings.Join(missing, " and "))
}
return nil
}
// Filename returns the filename consulted for standard configuration.
func Filename() string {
// TODO(dsymonds): Might need tweaking for Windows.
return filepath.Join(os.Getenv("HOME"), ".cbtrc")
}
// Load loads a .cbtrc file.
// If the file is not present, an empty config is returned.
func Load() (*Config, error) {
filename := Filename()
data, err := ioutil.ReadFile(filename)
if err != nil {
// silent fail if the file isn't there
if os.IsNotExist(err) {
return &Config{}, nil
}
return nil, fmt.Errorf("Reading %s: %v", filename, err)
}
c := new(Config)
s := bufio.NewScanner(bytes.NewReader(data))
for s.Scan() {
line := s.Text()
i := strings.Index(line, "=")
if i < 0 {
return nil, fmt.Errorf("Bad line in %s: %q", filename, line)
}
key, val := strings.TrimSpace(line[:i]), strings.TrimSpace(line[i+1:])
switch key {
default:
return nil, fmt.Errorf("Unknown key in %s: %q", filename, key)
case "project":
c.Project = val
case "instance":
c.Instance = val
case "creds":
c.Creds = val
case "admin-endpoint":
c.AdminEndpoint = val
case "data-endpoint":
c.DataEndpoint = val
case "cert-file":
c.CertFile = val
case "user-agent":
c.UserAgent = val
}
}
return c, s.Err()
}
// GcloudCredential holds gcloud credential information.
type GcloudCredential struct {
AccessToken string `json:"access_token"`
Expiry time.Time `json:"token_expiry"`
}
// Token creates an oauth2 token using gcloud credentials.
func (cred *GcloudCredential) Token() *oauth2.Token {
return &oauth2.Token{AccessToken: cred.AccessToken, TokenType: "Bearer", Expiry: cred.Expiry}
}
// GcloudConfig holds gcloud configuration values.
type GcloudConfig struct {
Configuration struct {
Properties struct {
Core struct {
Project string `json:"project"`
} `json:"core"`
} `json:"properties"`
} `json:"configuration"`
Credential GcloudCredential `json:"credential"`
}
// GcloudCmdTokenSource holds the comamnd arguments. It is only intended to be set by the program.
// TODO(deklerk) Can this be unexported?
type GcloudCmdTokenSource struct {
Command string
Args []string
}
// Token implements the oauth2.TokenSource interface
func (g *GcloudCmdTokenSource) Token() (*oauth2.Token, error) {
gcloudConfig, err := LoadGcloudConfig(g.Command, g.Args)
if err != nil {
return nil, err
}
return gcloudConfig.Credential.Token(), nil
}
// LoadGcloudConfig retrieves the gcloud configuration values we need use via the
// 'config-helper' command
func LoadGcloudConfig(gcloudCmd string, gcloudCmdArgs []string) (*GcloudConfig, error) {
out, err := exec.Command(gcloudCmd, gcloudCmdArgs...).Output()
if err != nil {
return nil, fmt.Errorf("Could not retrieve gcloud configuration")
}
var gcloudConfig GcloudConfig
if err := json.Unmarshal(out, &gcloudConfig); err != nil {
return nil, fmt.Errorf("Could not parse gcloud configuration")
}
return &gcloudConfig, nil
}
// SetFromGcloud retrieves and sets any missing config values from the gcloud
// configuration if possible possible
func (c *Config) SetFromGcloud() error {
if c.Creds == "" {
c.Creds = os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if c.Creds == "" {
log.Printf("-creds flag unset, will use gcloud credential")
}
} else {
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", c.Creds)
}
if c.Project == "" {
log.Printf("-project flag unset, will use gcloud active project")
}
if c.Creds != "" && c.Project != "" {
return nil
}
gcloudCmd := "gcloud"
if runtime.GOOS == "windows" {
gcloudCmd = gcloudCmd + ".cmd"
}
gcloudCmdArgs := []string{"config", "config-helper",
"--format=json(configuration.properties.core.project,credential)"}
gcloudConfig, err := LoadGcloudConfig(gcloudCmd, gcloudCmdArgs)
if err != nil {
return err
}
if c.Project == "" && gcloudConfig.Configuration.Properties.Core.Project != "" {
log.Printf("gcloud active project is \"%s\"",
gcloudConfig.Configuration.Properties.Core.Project)
c.Project = gcloudConfig.Configuration.Properties.Core.Project
}
if c.Creds == "" {
c.TokenSource = oauth2.ReuseTokenSource(
gcloudConfig.Credential.Token(),
&GcloudCmdTokenSource{Command: gcloudCmd, Args: gcloudCmdArgs})
}
return nil
}
/*
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package gax is a snapshot from github.com/googleapis/gax-go with minor modifications.
package gax
import (
"time"
"google.golang.org/grpc/codes"
)
// CallOption is a generic interface for modifying the behavior of outbound calls.
type CallOption interface {
Resolve(*CallSettings)
}
type callOptions []CallOption
// Resolve resolves all call options individually.
func (opts callOptions) Resolve(s *CallSettings) *CallSettings {
for _, opt := range opts {
opt.Resolve(s)
}
return s
}
// CallSettings encapsulates the call settings for a particular API call.
type CallSettings struct {
Timeout time.Duration
RetrySettings RetrySettings
}
// RetrySettings are per-call configurable settings for retrying upon transient failure.
type RetrySettings struct {
RetryCodes map[codes.Code]bool
BackoffSettings BackoffSettings
}
// BackoffSettings are parameters to the exponential backoff algorithm for retrying.
type BackoffSettings struct {
DelayTimeoutSettings MultipliableDuration
RPCTimeoutSettings MultipliableDuration
}
// MultipliableDuration defines parameters for backoff settings.
type MultipliableDuration struct {
Initial time.Duration
Max time.Duration
Multiplier float64
}
// Resolve merges the receiver CallSettings into the given CallSettings.
func (w CallSettings) Resolve(s *CallSettings) {
s.Timeout = w.Timeout
s.RetrySettings = w.RetrySettings
s.RetrySettings.RetryCodes = make(map[codes.Code]bool, len(w.RetrySettings.RetryCodes))
for key, value := range w.RetrySettings.RetryCodes {
s.RetrySettings.RetryCodes[key] = value
}
}
type withRetryCodes []codes.Code
func (w withRetryCodes) Resolve(s *CallSettings) {
s.RetrySettings.RetryCodes = make(map[codes.Code]bool)
for _, code := range w {
s.RetrySettings.RetryCodes[code] = true
}
}
// WithRetryCodes sets a list of Google API canonical error codes upon which a
// retry should be attempted.
func WithRetryCodes(retryCodes []codes.Code) CallOption {
return withRetryCodes(retryCodes)
}
type withDelayTimeoutSettings MultipliableDuration
func (w withDelayTimeoutSettings) Resolve(s *CallSettings) {
s.RetrySettings.BackoffSettings.DelayTimeoutSettings = MultipliableDuration(w)
}
// WithDelayTimeoutSettings specifies:
// - The initial delay time, in milliseconds, between the completion of
// the first failed request and the initiation of the first retrying
// request.
// - The multiplier by which to increase the delay time between the
// completion of failed requests, and the initiation of the subsequent
// retrying request.
// - The maximum delay time, in milliseconds, between requests. When this
// value is reached, `RetryDelayMultiplier` will no longer be used to
// increase delay time.
func WithDelayTimeoutSettings(initial time.Duration, max time.Duration, multiplier float64) CallOption {
return withDelayTimeoutSettings(MultipliableDuration{initial, max, multiplier})
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package gax is a snapshot from github.com/googleapis/gax-go with minor modifications.
package gax
import (
"context"
"log"
"math/rand"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// Logger is a logger that logs to stderr.
var Logger = log.New(os.Stderr, "", log.LstdFlags)
// APICall is a user defined call stub.
type APICall func(context.Context) error
// scaleDuration returns the product of a and mult.
func scaleDuration(a time.Duration, mult float64) time.Duration {
ns := float64(a) * mult
return time.Duration(ns)
}
// invokeWithRetry calls stub using an exponential backoff retry mechanism
// based on the values provided in callSettings.
func invokeWithRetry(ctx context.Context, stub APICall, callSettings CallSettings) error {
retrySettings := callSettings.RetrySettings
backoffSettings := callSettings.RetrySettings.BackoffSettings
delay := backoffSettings.DelayTimeoutSettings.Initial
for {
// If the deadline is exceeded...
if ctx.Err() != nil {
return ctx.Err()
}
err := stub(ctx)
code := grpc.Code(err)
if code == codes.OK {
return nil
}
if !retrySettings.RetryCodes[code] {
return err
}
// Sleep a random amount up to the current delay
d := time.Duration(rand.Int63n(int64(delay)))
delayCtx, _ := context.WithTimeout(ctx, delay)
if Logger != nil {
Logger.Printf("Retryable error: %v, retrying in %v", err, d)
}
<-delayCtx.Done()
delay = scaleDuration(delay, backoffSettings.DelayTimeoutSettings.Multiplier)
if delay > backoffSettings.DelayTimeoutSettings.Max {
delay = backoffSettings.DelayTimeoutSettings.Max
}
}
}
// Invoke calls stub with a child of context modified by the specified options.
func Invoke(ctx context.Context, stub APICall, opts ...CallOption) error {
settings := &CallSettings{}
callOptions(opts).Resolve(settings)
if len(settings.RetrySettings.RetryCodes) > 0 {
return invokeWithRetry(ctx, stub, *settings)
}
return stub(ctx)
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gax
import (
"context"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestRandomizedDelays(t *testing.T) {
max := 200 * time.Millisecond
settings := []CallOption{
WithRetryCodes([]codes.Code{codes.Unavailable, codes.DeadlineExceeded}),
WithDelayTimeoutSettings(10*time.Millisecond, max, 1.5),
}
deadline := time.Now().Add(1 * time.Second)
ctx, _ := context.WithDeadline(context.Background(), deadline)
var invokeTime time.Time
_ = Invoke(ctx, func(childCtx context.Context) error {
// Keep failing, make sure we never slept more than max (plus a fudge factor)
if !invokeTime.IsZero() {
if got, want := time.Since(invokeTime), max; got > (want + 20*time.Millisecond) {
t.Logf("Slept too long. Got: %v, want: %v", got, max)
}
}
invokeTime = time.Now()
// Workaround for `go vet`: https://github.com/grpc/grpc-go/issues/90
errf := status.Errorf
return errf(codes.Unavailable, "")
}, settings...)
}
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package option contains common code for dealing with client options.
package option
import (
"fmt"
"os"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
// DefaultClientOptions returns the default client options to use for the
// client's gRPC connection.
func DefaultClientOptions(endpoint, scope, userAgent string) ([]option.ClientOption, error) {
var o []option.ClientOption
// Check the environment variables for the bigtable emulator.
// Dial it directly and don't pass any credentials.
if addr := os.Getenv("BIGTABLE_EMULATOR_HOST"); addr != "" {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, fmt.Errorf("emulator grpc.Dial: %v", err)
}
o = []option.ClientOption{option.WithGRPCConn(conn)}
} else {
o = []option.ClientOption{
option.WithEndpoint(endpoint),
option.WithScopes(scope),
option.WithUserAgent(userAgent),
}
}
return o, nil
}
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stat
import (
"bytes"
"encoding/csv"
"fmt"
"io"
"math"
"sort"
"strconv"
"text/tabwriter"
"time"
)
type byDuration []time.Duration
func (data byDuration) Len() int { return len(data) }
func (data byDuration) Swap(i, j int) { data[i], data[j] = data[j], data[i] }
func (data byDuration) Less(i, j int) bool { return data[i] < data[j] }
// quantile returns a value representing the kth of q quantiles.
// May alter the order of data.
func quantile(data []time.Duration, k, q int) (quantile time.Duration, ok bool) {
if len(data) < 1 {
return 0, false
}
if k > q {
return 0, false
}
if k < 0 || q < 1 {
return 0, false
}
sort.Sort(byDuration(data))
if k == 0 {
return data[0], true
}
if k == q {
return data[len(data)-1], true
}
bucketSize := float64(len(data)-1) / float64(q)
i := float64(k) * bucketSize
lower := int(math.Trunc(i))
var upper int
if i > float64(lower) && lower+1 < len(data) {
// If the quantile lies between two elements
upper = lower + 1
} else {
upper = lower
}
weightUpper := i - float64(lower)
weightLower := 1 - weightUpper
return time.Duration(weightLower*float64(data[lower]) + weightUpper*float64(data[upper])), true
}
// Aggregate is an aggregate of latencies.
type Aggregate struct {
Name string
Count, Errors int
Min, Median, Max time.Duration
P75, P90, P95, P99 time.Duration // percentiles
}
// NewAggregate constructs an aggregate from latencies. Returns nil if latencies does not contain aggregateable data.
func NewAggregate(name string, latencies []time.Duration, errorCount int) *Aggregate {
agg := Aggregate{Name: name, Count: len(latencies), Errors: errorCount}
if len(latencies) == 0 {
return nil
}
var ok bool
if agg.Min, ok = quantile(latencies, 0, 2); !ok {
return nil
}
if agg.Median, ok = quantile(latencies, 1, 2); !ok {
return nil
}
if agg.Max, ok = quantile(latencies, 2, 2); !ok {
return nil
}
if agg.P75, ok = quantile(latencies, 75, 100); !ok {
return nil
}
if agg.P90, ok = quantile(latencies, 90, 100); !ok {
return nil
}
if agg.P95, ok = quantile(latencies, 95, 100); !ok {
return nil
}
if agg.P99, ok = quantile(latencies, 99, 100); !ok {
return nil
}
return &agg
}
func (agg *Aggregate) String() string {
if agg == nil {
return "no data"
}
var buf bytes.Buffer
tw := tabwriter.NewWriter(&buf, 0, 0, 1, ' ', 0) // one-space padding
fmt.Fprintf(tw, "min:\t%v\nmedian:\t%v\nmax:\t%v\n95th percentile:\t%v\n99th percentile:\t%v\n",
agg.Min, agg.Median, agg.Max, agg.P95, agg.P99)
tw.Flush()
return buf.String()
}
// WriteCSV writes a csv file to the given Writer,
// with a header row and one row per aggregate.
func WriteCSV(aggs []*Aggregate, iow io.Writer) (err error) {
w := csv.NewWriter(iow)
defer func() {
w.Flush()
if err == nil {
err = w.Error()
}
}()
err = w.Write([]string{"name", "count", "errors", "min", "median", "max", "p75", "p90", "p95", "p99"})
if err != nil {
return err
}
for _, agg := range aggs {
err = w.Write([]string{
agg.Name, strconv.Itoa(agg.Count), strconv.Itoa(agg.Errors),
agg.Min.String(), agg.Median.String(), agg.Max.String(),
agg.P75.String(), agg.P90.String(), agg.P95.String(), agg.P99.String(),
})
if err != nil {
return err
}
}
return nil
}
/*
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"bytes"
"fmt"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
)
// A Row is returned by ReadRows. The map is keyed by column family (the prefix
// of the column name before the colon). The values are the returned ReadItems
// for that column family in the order returned by Read.
type Row map[string][]ReadItem
// Key returns the row's key, or "" if the row is empty.
func (r Row) Key() string {
for _, items := range r {
if len(items) > 0 {
return items[0].Row
}
}
return ""
}
// A ReadItem is returned by Read. A ReadItem contains data from a specific row and column.
type ReadItem struct {
Row, Column string
Timestamp Timestamp
Value []byte
}
// The current state of the read rows state machine.
type rrState int64
const (
newRow rrState = iota
rowInProgress
cellInProgress
)
// chunkReader handles cell chunks from the read rows response and combines
// them into full Rows.
type chunkReader struct {
state rrState
curKey []byte
curFam string
curQual []byte
curTS int64
curVal []byte
curRow Row
lastKey string
}
// newChunkReader returns a new chunkReader for handling read rows responses.
func newChunkReader() *chunkReader {
return &chunkReader{state: newRow}
}
// Process takes a cell chunk and returns a new Row if the given chunk
// completes a Row, or nil otherwise.
func (cr *chunkReader) Process(cc *btpb.ReadRowsResponse_CellChunk) (Row, error) {
var row Row
switch cr.state {
case newRow:
if err := cr.validateNewRow(cc); err != nil {
return nil, err
}
cr.curRow = make(Row)
cr.curKey = cc.RowKey
cr.curFam = cc.FamilyName.Value
cr.curQual = cc.Qualifier.Value
cr.curTS = cc.TimestampMicros
row = cr.handleCellValue(cc)
case rowInProgress:
if err := cr.validateRowInProgress(cc); err != nil {
return nil, err
}
if cc.GetResetRow() {
cr.resetToNewRow()
return nil, nil
}
if cc.FamilyName != nil {
cr.curFam = cc.FamilyName.Value
}
if cc.Qualifier != nil {
cr.curQual = cc.Qualifier.Value
}
cr.curTS = cc.TimestampMicros
row = cr.handleCellValue(cc)
case cellInProgress:
if err := cr.validateCellInProgress(cc); err != nil {
return nil, err
}
if cc.GetResetRow() {
cr.resetToNewRow()
return nil, nil
}
row = cr.handleCellValue(cc)
}
return row, nil
}
// Close must be called after all cell chunks from the response
// have been processed. An error will be returned if the reader is
// in an invalid state, in which case the error should be propagated to the caller.
func (cr *chunkReader) Close() error {
if cr.state != newRow {
return fmt.Errorf("invalid state for end of stream %q", cr.state)
}
return nil
}
// handleCellValue returns a Row if the cell value includes a commit, otherwise nil.
func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row {
if cc.ValueSize > 0 {
// ValueSize is specified so expect a split value of ValueSize bytes
if cr.curVal == nil {
cr.curVal = make([]byte, 0, cc.ValueSize)
}
cr.curVal = append(cr.curVal, cc.Value...)
cr.state = cellInProgress
} else {
// This cell is either the complete value or the last chunk of a split
if cr.curVal == nil {
cr.curVal = cc.Value
} else {
cr.curVal = append(cr.curVal, cc.Value...)
}
cr.finishCell()
if cc.GetCommitRow() {
return cr.commitRow()
}
cr.state = rowInProgress
}
return nil
}
func (cr *chunkReader) finishCell() {
ri := ReadItem{
Row: string(cr.curKey),
Column: string(cr.curFam) + ":" + string(cr.curQual),
Timestamp: Timestamp(cr.curTS),
Value: cr.curVal,
}
cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri)
cr.curVal = nil
}
func (cr *chunkReader) commitRow() Row {
row := cr.curRow
cr.lastKey = cr.curRow.Key()
cr.resetToNewRow()
return row
}
func (cr *chunkReader) resetToNewRow() {
cr.curKey = nil
cr.curFam = ""
cr.curQual = nil
cr.curVal = nil
cr.curRow = nil
cr.curTS = 0
cr.state = newRow
}
func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error {
if cc.GetResetRow() {
return fmt.Errorf("reset_row not allowed between rows")
}
if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil {
return fmt.Errorf("missing key field for new row %v", cc)
}
if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) {
return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey))
}
return nil
}
func (cr *chunkReader) validateRowInProgress(cc *btpb.ReadRowsResponse_CellChunk) error {
if err := cr.validateRowStatus(cc); err != nil {
return err
}
if cc.RowKey != nil && !bytes.Equal(cc.RowKey, cr.curKey) {
return fmt.Errorf("received new row key %q during existing row %q", cc.RowKey, cr.curKey)
}
if cc.FamilyName != nil && cc.Qualifier == nil {
return fmt.Errorf("family name %q specified without a qualifier", cc.FamilyName)
}
return nil
}
func (cr *chunkReader) validateCellInProgress(cc *btpb.ReadRowsResponse_CellChunk) error {
if err := cr.validateRowStatus(cc); err != nil {
return err
}
if cr.curVal == nil {
return fmt.Errorf("no cached cell while CELL_IN_PROGRESS %v", cc)
}
if cc.GetResetRow() == false && cr.isAnyKeyPresent(cc) {
return fmt.Errorf("cell key components found while CELL_IN_PROGRESS %v", cc)
}
return nil
}
func (cr *chunkReader) isAnyKeyPresent(cc *btpb.ReadRowsResponse_CellChunk) bool {
return cc.RowKey != nil ||
cc.FamilyName != nil ||
cc.Qualifier != nil ||
cc.TimestampMicros != 0
}
// Validate a RowStatus, commit or reset, if present.
func (cr *chunkReader) validateRowStatus(cc *btpb.ReadRowsResponse_CellChunk) error {
// Resets can't be specified with any other part of a cell
if cc.GetResetRow() && (cr.isAnyKeyPresent(cc) ||
cc.Value != nil ||
cc.ValueSize != 0 ||
cc.Labels != nil) {
return fmt.Errorf("reset must not be specified with other fields %v", cc)
}
if cc.GetCommitRow() && cc.ValueSize > 0 {
return fmt.Errorf("commit row found in between chunks in a cell")
}
return nil
}
/*
Copyright 2016 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"testing"
"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/wrappers"
btspb "google.golang.org/genproto/googleapis/bigtable/v2"
)
// Indicates that a field in the proto should be omitted, rather than included
// as a wrapped empty string.
const nilStr = "<>"
func TestSingleCell(t *testing.T) {
cr := newChunkReader()
// All in one cell
row, err := cr.Process(cc("rk", "fm", "col", 1, "value", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
if row == nil {
t.Fatalf("Missing row")
}
if len(row["fm"]) != 1 {
t.Fatalf("Family name length mismatch %d, %d", 1, len(row["fm"]))
}
want := []ReadItem{ri("rk", "fm", "col", 1, "value")}
if !testutil.Equal(row["fm"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm"], want)
}
if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestMultipleCells(t *testing.T) {
cr := newChunkReader()
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false))
mustProcess(t, cr, cc("rs", "fm2", "col1", 0, "val4", 0, false))
row, err := cr.Process(cc("rs", "fm2", "col2", 1, "extralongval5", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
if row == nil {
t.Fatalf("Missing row")
}
want := []ReadItem{
ri("rs", "fm1", "col1", 0, "val1"),
ri("rs", "fm1", "col1", 1, "val2"),
ri("rs", "fm1", "col2", 0, "val3"),
}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
want = []ReadItem{
ri("rs", "fm2", "col1", 0, "val4"),
ri("rs", "fm2", "col2", 1, "extralongval5"),
}
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestSplitCells(t *testing.T) {
cr := newChunkReader()
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "hello ", 11, false))
mustProcess(t, cr, ccData("world", 0, false))
row, err := cr.Process(cc("rs", "fm1", "col2", 0, "val2", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
if row == nil {
t.Fatalf("Missing row")
}
want := []ReadItem{
ri("rs", "fm1", "col1", 0, "hello world"),
ri("rs", "fm1", "col2", 0, "val2"),
}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestMultipleRows(t *testing.T) {
cr := newChunkReader()
row, err := cr.Process(cc("rs1", "fm1", "col1", 1, "val1", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")}
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestBlankQualifier(t *testing.T) {
cr := newChunkReader()
row, err := cr.Process(cc("rs1", "fm1", "", 1, "val1", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want := []ReadItem{ri("rs1", "fm1", "", 1, "val1")}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm1"], want)
}
row, err = cr.Process(cc("rs2", "fm2", "col2", 2, "val2", 0, true))
if err != nil {
t.Fatalf("Processing chunk: %v", err)
}
want = []ReadItem{ri("rs2", "fm2", "col2", 2, "val2")}
if !testutil.Equal(row["fm2"], want) {
t.Fatalf("Incorrect ReadItem: got: %v\nwant: %v\n", row["fm2"], want)
}
if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestReset(t *testing.T) {
cr := newChunkReader()
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col1", 1, "val2", 0, false))
mustProcess(t, cr, cc("rs", "fm1", "col2", 0, "val3", 0, false))
mustProcess(t, cr, ccReset())
row := mustProcess(t, cr, cc("rs1", "fm1", "col1", 1, "val1", 0, true))
want := []ReadItem{ri("rs1", "fm1", "col1", 1, "val1")}
if !testutil.Equal(row["fm1"], want) {
t.Fatalf("Reset: got: %v\nwant: %v\n", row["fm1"], want)
}
if err := cr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestNewFamEmptyQualifier(t *testing.T) {
cr := newChunkReader()
mustProcess(t, cr, cc("rs", "fm1", "col1", 0, "val1", 0, false))
_, err := cr.Process(cc(nilStr, "fm2", nilStr, 0, "val2", 0, true))
if err == nil {
t.Fatalf("Expected error on second chunk with no qualifier set")
}
}
func mustProcess(t *testing.T, cr *chunkReader, cc *btspb.ReadRowsResponse_CellChunk) Row {
row, err := cr.Process(cc)
if err != nil {
t.Fatal(err)
}
return row
}
// The read rows acceptance test reads a json file specifying a number of tests,
// each consisting of one or more cell chunk text protos and one or more resulting
// cells or errors.
type AcceptanceTest struct {
Tests []TestCase `json:"tests"`
}
type TestCase struct {
Name string `json:"name"`
Chunks []string `json:"chunks"`
Results []TestResult `json:"results"`
}
type TestResult struct {
RK string `json:"rk"`
FM string `json:"fm"`
Qual string `json:"qual"`
TS int64 `json:"ts"`
Value string `json:"value"`
Error bool `json:"error"` // If true, expect an error. Ignore any other field.
}
func TestAcceptance(t *testing.T) {
testJSON, err := ioutil.ReadFile("./testdata/read-rows-acceptance-test.json")
if err != nil {
t.Fatalf("could not open acceptance test file %v", err)
}
var accTest AcceptanceTest
err = json.Unmarshal(testJSON, &accTest)
if err != nil {
t.Fatalf("could not parse acceptance test file: %v", err)
}
for _, test := range accTest.Tests {
runTestCase(t, test)
}
}
func runTestCase(t *testing.T, test TestCase) {
// Increment an index into the result array as we get results
cr := newChunkReader()
var results []TestResult
var seenErr bool
for _, chunkText := range test.Chunks {
// Parse and pass each cell chunk to the ChunkReader
cc := &btspb.ReadRowsResponse_CellChunk{}
err := proto.UnmarshalText(chunkText, cc)
if err != nil {
t.Errorf("[%s] failed to unmarshal text proto: %s\n%s", test.Name, chunkText, err)
return
}
row, err := cr.Process(cc)
if err != nil {
results = append(results, TestResult{Error: true})
seenErr = true
break
} else {
// Turn the Row into TestResults
for fm, ris := range row {
for _, ri := range ris {
tr := TestResult{
RK: ri.Row,
FM: fm,
Qual: strings.Split(ri.Column, ":")[1],
TS: int64(ri.Timestamp),
Value: string(ri.Value),
}
results = append(results, tr)
}
}
}
}
// Only Close if we don't have an error yet, otherwise Close: is expected.
if !seenErr {
err := cr.Close()
if err != nil {
results = append(results, TestResult{Error: true})
}
}
got := toSet(results)
want := toSet(test.Results)
if !testutil.Equal(got, want) {
t.Fatalf("[%s]: got: %v\nwant: %v\n", test.Name, got, want)
}
}
func toSet(res []TestResult) map[TestResult]bool {
set := make(map[TestResult]bool)
for _, tr := range res {
set[tr] = true
}
return set
}
// ri returns a ReadItem for the given components
func ri(rk string, fm string, qual string, ts int64, val string) ReadItem {
return ReadItem{Row: rk, Column: fmt.Sprintf("%s:%s", fm, qual), Value: []byte(val), Timestamp: Timestamp(ts)}
}
// cc returns a CellChunk proto
func cc(rk string, fm string, qual string, ts int64, val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk {
// The components of the cell key are wrapped and can be null or empty
var rkWrapper []byte
if rk == nilStr {
rkWrapper = nil
} else {
rkWrapper = []byte(rk)
}
var fmWrapper *wrappers.StringValue
if fm != nilStr {
fmWrapper = &wrappers.StringValue{Value: fm}
} else {
fmWrapper = nil
}
var qualWrapper *wrappers.BytesValue
if qual != nilStr {
qualWrapper = &wrappers.BytesValue{Value: []byte(qual)}
} else {
qualWrapper = nil
}
return &btspb.ReadRowsResponse_CellChunk{
RowKey: rkWrapper,
FamilyName: fmWrapper,
Qualifier: qualWrapper,
TimestampMicros: ts,
Value: []byte(val),
ValueSize: size,
RowStatus: &btspb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: commit}}
}
// ccData returns a CellChunk with only a value and size
func ccData(val string, size int32, commit bool) *btspb.ReadRowsResponse_CellChunk {
return cc(nilStr, nilStr, nilStr, 0, val, size, commit)
}
// ccReset returns a CellChunk with RestRow set to true
func ccReset() *btspb.ReadRowsResponse_CellChunk {
return &btspb.ReadRowsResponse_CellChunk{
RowStatus: &btspb.ReadRowsResponse_CellChunk_ResetRow{ResetRow: true}}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment