feat: load model from oci image

This commit is contained in:
2023-05-05 17:07:29 +02:00
parent b57698380e
commit 9fb01f7be9
441 changed files with 61395 additions and 15356 deletions

View File

@@ -0,0 +1,88 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cas
import (
"bytes"
"context"
"fmt"
"io"
"sync"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
contentpkg "oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/descriptor"
)
// Memory is a memory based CAS.
type Memory struct {
content sync.Map // map[descriptor.Descriptor][]byte
}
// NewMemory creates a new Memory CAS.
func NewMemory() *Memory {
return &Memory{}
}
// Fetch fetches the content identified by the descriptor.
func (m *Memory) Fetch(_ context.Context, target ocispec.Descriptor) (io.ReadCloser, error) {
key := descriptor.FromOCI(target)
content, exists := m.content.Load(key)
if !exists {
return nil, fmt.Errorf("%s: %s: %w", key.Digest, key.MediaType, errdef.ErrNotFound)
}
return io.NopCloser(bytes.NewReader(content.([]byte))), nil
}
// Push pushes the content, matching the expected descriptor.
func (m *Memory) Push(_ context.Context, expected ocispec.Descriptor, content io.Reader) error {
key := descriptor.FromOCI(expected)
// check if the content exists in advance to avoid reading from the content.
if _, exists := m.content.Load(key); exists {
return fmt.Errorf("%s: %s: %w", key.Digest, key.MediaType, errdef.ErrAlreadyExists)
}
// read and try to store the content.
value, err := contentpkg.ReadAll(content, expected)
if err != nil {
return err
}
if _, exists := m.content.LoadOrStore(key, value); exists {
return fmt.Errorf("%s: %s: %w", key.Digest, key.MediaType, errdef.ErrAlreadyExists)
}
return nil
}
// Exists returns true if the described content exists.
func (m *Memory) Exists(_ context.Context, target ocispec.Descriptor) (bool, error) {
key := descriptor.FromOCI(target)
_, exists := m.content.Load(key)
return exists, nil
}
// Map dumps the memory into a built-in map structure.
// Like other operations, calling Map() is go-routine safe. However, it does not
// necessarily correspond to any consistent snapshot of the storage contents.
func (m *Memory) Map() map[descriptor.Descriptor][]byte {
res := make(map[descriptor.Descriptor][]byte)
m.content.Range(func(key, value interface{}) bool {
res[key.(descriptor.Descriptor)] = value.([]byte)
return true
})
return res
}

View File

@@ -0,0 +1,125 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cas
import (
"context"
"io"
"sync"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/internal/ioutil"
)
// Proxy is a caching proxy for the storage.
// The first fetch call of a described content will read from the remote and
// cache the fetched content.
// The subsequent fetch call will read from the local cache.
type Proxy struct {
content.ReadOnlyStorage
Cache content.Storage
StopCaching bool
}
// NewProxy creates a proxy for the `base` storage, using the `cache` storage as
// the cache.
func NewProxy(base content.ReadOnlyStorage, cache content.Storage) *Proxy {
return &Proxy{
ReadOnlyStorage: base,
Cache: cache,
}
}
// NewProxyWithLimit creates a proxy for the `base` storage, using the `cache`
// storage with a push size limit as the cache.
func NewProxyWithLimit(base content.ReadOnlyStorage, cache content.Storage, pushLimit int64) *Proxy {
limitedCache := content.LimitStorage(cache, pushLimit)
return &Proxy{
ReadOnlyStorage: base,
Cache: limitedCache,
}
}
// Fetch fetches the content identified by the descriptor.
func (p *Proxy) Fetch(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) {
if p.StopCaching {
return p.FetchCached(ctx, target)
}
rc, err := p.Cache.Fetch(ctx, target)
if err == nil {
return rc, nil
}
rc, err = p.ReadOnlyStorage.Fetch(ctx, target)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
var pushErr error
go func() {
defer wg.Done()
pushErr = p.Cache.Push(ctx, target, pr)
if pushErr != nil {
pr.CloseWithError(pushErr)
}
}()
closer := ioutil.CloserFunc(func() error {
rcErr := rc.Close()
if err := pw.Close(); err != nil {
return err
}
wg.Wait()
if pushErr != nil {
return pushErr
}
return rcErr
})
return struct {
io.Reader
io.Closer
}{
Reader: io.TeeReader(rc, pw),
Closer: closer,
}, nil
}
// FetchCached fetches the content identified by the descriptor.
// If the content is not cached, it will be fetched from the remote without
// caching.
func (p *Proxy) FetchCached(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) {
exists, err := p.Cache.Exists(ctx, target)
if err != nil {
return nil, err
}
if exists {
return p.Cache.Fetch(ctx, target)
}
return p.ReadOnlyStorage.Fetch(ctx, target)
}
// Exists returns true if the described content exists.
func (p *Proxy) Exists(ctx context.Context, target ocispec.Descriptor) (bool, error) {
exists, err := p.Cache.Exists(ctx, target)
if err == nil && exists {
return true, nil
}
return p.ReadOnlyStorage.Exists(ctx, target)
}

View File

@@ -0,0 +1,35 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package set
// Set represents a set data structure.
type Set[T comparable] map[T]struct{}
// New returns an initialized set.
func New[T comparable]() Set[T] {
return make(Set[T])
}
// Add adds item into the set s.
func (s Set[T]) Add(item T) {
s[item] = struct{}{}
}
// Contains returns true if the set s contains item.
func (s Set[T]) Contains(item T) bool {
_, ok := s[item]
return ok
}

View File

@@ -0,0 +1,55 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package copyutil
import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// NodeInfo represents information of a node that is being visited in
// ExtendedCopy.
type NodeInfo struct {
// Node represents a node in the graph.
Node ocispec.Descriptor
// Depth represents the depth of the node in the graph.
Depth int
}
// Stack represents a stack data structure that is used in ExtendedCopy for
// storing node information.
type Stack []NodeInfo
// IsEmpty returns true if the stack is empty, otherwise returns false.
func (s *Stack) IsEmpty() bool {
return len(*s) == 0
}
// Push pushes an item to the stack.
func (s *Stack) Push(i NodeInfo) {
*s = append(*s, i)
}
// Pop pops the top item out of the stack.
func (s *Stack) Pop() (NodeInfo, bool) {
if s.IsEmpty() {
return NodeInfo{}, false
}
last := len(*s) - 1
top := (*s)[last]
*s = (*s)[:last]
return top, true
}

View File

@@ -0,0 +1,88 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package descriptor
import (
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/internal/docker"
)
// DefaultMediaType is the media type used when no media type is specified.
const DefaultMediaType string = "application/octet-stream"
// Descriptor contains the minimun information to describe the disposition of
// targeted content.
// Since it only has strings and integers, Descriptor is a comparable struct.
type Descriptor struct {
// MediaType is the media type of the object this schema refers to.
MediaType string `json:"mediaType,omitempty"`
// Digest is the digest of the targeted content.
Digest digest.Digest `json:"digest"`
// Size specifies the size in bytes of the blob.
Size int64 `json:"size"`
}
// Empty is an empty descriptor
var Empty Descriptor
// FromOCI shrinks the OCI descriptor to the minimum.
func FromOCI(desc ocispec.Descriptor) Descriptor {
return Descriptor{
MediaType: desc.MediaType,
Digest: desc.Digest,
Size: desc.Size,
}
}
// IsForeignLayer checks if a descriptor describes a foreign layer.
func IsForeignLayer(desc ocispec.Descriptor) bool {
switch desc.MediaType {
case ocispec.MediaTypeImageLayerNonDistributable,
ocispec.MediaTypeImageLayerNonDistributableGzip,
ocispec.MediaTypeImageLayerNonDistributableZstd,
docker.MediaTypeForeignLayer:
return true
default:
return false
}
}
// IsManifest checks if a descriptor describes a manifest.
func IsManifest(desc ocispec.Descriptor) bool {
switch desc.MediaType {
case docker.MediaTypeManifest,
docker.MediaTypeManifestList,
ocispec.MediaTypeImageManifest,
ocispec.MediaTypeImageIndex,
ocispec.MediaTypeArtifactManifest:
return true
default:
return false
}
}
// Plain returns a plain descriptor that contains only MediaType, Digest and
// Size.
func Plain(desc ocispec.Descriptor) ocispec.Descriptor {
return ocispec.Descriptor{
MediaType: desc.MediaType,
Digest: desc.Digest,
Size: desc.Size,
}
}

View File

@@ -0,0 +1,24 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
// docker media types
const (
MediaTypeConfig = "application/vnd.docker.container.image.v1+json"
MediaTypeManifestList = "application/vnd.docker.distribution.manifest.list.v2+json"
MediaTypeManifest = "application/vnd.docker.distribution.manifest.v2+json"
MediaTypeForeignLayer = "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip"
)

View File

@@ -0,0 +1,134 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package graph
import (
"context"
"errors"
"sync"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/status"
"oras.land/oras-go/v2/internal/syncutil"
)
// Memory is a memory based PredecessorFinder.
type Memory struct {
predecessors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
indexed sync.Map // map[descriptor.Descriptor]any
}
// NewMemory creates a new memory PredecessorFinder.
func NewMemory() *Memory {
return &Memory{}
}
// Index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) Index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return err
}
m.index(ctx, node, successors)
return nil
}
// Index indexes predecessors for all the successors of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) IndexAll(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
// track content status
tracker := status.NewTracker()
var fn syncutil.GoFunc[ocispec.Descriptor]
fn = func(ctx context.Context, region *syncutil.LimitedRegion, desc ocispec.Descriptor) error {
// skip the node if other go routine is working on it
_, committed := tracker.TryCommit(desc)
if !committed {
return nil
}
// skip the node if it has been indexed
key := descriptor.FromOCI(desc)
_, exists := m.indexed.Load(key)
if exists {
return nil
}
successors, err := content.Successors(ctx, fetcher, desc)
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
// skip the node if it does not exist
return nil
}
return err
}
m.index(ctx, desc, successors)
m.indexed.Store(key, nil)
if len(successors) > 0 {
// traverse and index successors
return syncutil.Go(ctx, nil, fn, successors...)
}
return nil
}
return syncutil.Go(ctx, nil, fn, node)
}
// Predecessors returns the nodes directly pointing to the current node.
// Predecessors returns nil without error if the node does not exists in the
// store.
// Like other operations, calling Predecessors() is go-routine safe. However,
// it does not necessarily correspond to any consistent snapshot of the stored
// contents.
func (m *Memory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
key := descriptor.FromOCI(node)
value, exists := m.predecessors.Load(key)
if !exists {
return nil, nil
}
predecessors := value.(*sync.Map)
var res []ocispec.Descriptor
predecessors.Range(func(key, value interface{}) bool {
res = append(res, value.(ocispec.Descriptor))
return true
})
return res, nil
}
// index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) index(ctx context.Context, node ocispec.Descriptor, successors []ocispec.Descriptor) {
if len(successors) == 0 {
return
}
predecessorKey := descriptor.FromOCI(node)
for _, successor := range successors {
successorKey := descriptor.FromOCI(successor)
value, _ := m.predecessors.LoadOrStore(successorKey, &sync.Map{})
predecessors := value.(*sync.Map)
predecessors.Store(predecessorKey, node)
}
}

View File

@@ -0,0 +1,116 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package httputil
import (
"errors"
"fmt"
"io"
"net/http"
)
// Client is an interface for a HTTP client.
// This interface is defined inside this package to prevent potential import
// loop.
type Client interface {
// Do sends an HTTP request and returns an HTTP response.
Do(*http.Request) (*http.Response, error)
}
// readSeekCloser seeks http body by starting new connections.
type readSeekCloser struct {
client Client
req *http.Request
rc io.ReadCloser
size int64
offset int64
closed bool
}
// NewReadSeekCloser returns a seeker to make the HTTP response seekable.
// Callers should ensure that the server supports Range request.
func NewReadSeekCloser(client Client, req *http.Request, respBody io.ReadCloser, size int64) io.ReadSeekCloser {
return &readSeekCloser{
client: client,
req: req,
rc: respBody,
size: size,
}
}
// Read reads the content body and counts offset.
func (rsc *readSeekCloser) Read(p []byte) (n int, err error) {
if rsc.closed {
return 0, errors.New("read: already closed")
}
n, err = rsc.rc.Read(p)
rsc.offset += int64(n)
return
}
// Seek starts a new connection to the remote for reading if position changes.
func (rsc *readSeekCloser) Seek(offset int64, whence int) (int64, error) {
if rsc.closed {
return 0, errors.New("seek: already closed")
}
switch whence {
case io.SeekCurrent:
offset += rsc.offset
case io.SeekStart:
// no-op
case io.SeekEnd:
offset += rsc.size
default:
return 0, errors.New("seek: invalid whence")
}
if offset < 0 {
return 0, errors.New("seek: an attempt was made to move the pointer before the beginning of the content")
}
if offset == rsc.offset {
return offset, nil
}
if offset >= rsc.size {
rsc.rc.Close()
rsc.rc = http.NoBody
rsc.offset = offset
return offset, nil
}
req := rsc.req.Clone(rsc.req.Context())
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, rsc.size-1))
resp, err := rsc.client.Do(req)
if err != nil {
return 0, fmt.Errorf("seek: %s %q: %w", req.Method, req.URL, err)
}
if resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
return 0, fmt.Errorf("seek: %s %q: unexpected status code %d", resp.Request.Method, resp.Request.URL, resp.StatusCode)
}
rsc.rc.Close()
rsc.rc = resp.Body
rsc.offset = offset
return offset, nil
}
// Close closes the content body.
func (rsc *readSeekCloser) Close() error {
if rsc.closed {
return nil
}
rsc.closed = true
return rsc.rc.Close()
}

View File

@@ -0,0 +1,24 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package interfaces
import "oras.land/oras-go/v2/registry"
// ReferenceParser provides reference parsing.
type ReferenceParser interface {
// ParseReference parses a reference to a fully qualified reference.
ParseReference(reference string) (registry.Reference, error)
}

View File

@@ -0,0 +1,58 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"fmt"
"io"
"reflect"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
)
// CloserFunc is the basic Close method defined in io.Closer.
type CloserFunc func() error
// Close performs close operation by the CloserFunc.
func (fn CloserFunc) Close() error {
return fn()
}
// CopyBuffer copies from src to dst through the provided buffer
// until either EOF is reached on src, or an error occurs.
// The copied content is verified against the size and the digest.
func CopyBuffer(dst io.Writer, src io.Reader, buf []byte, desc ocispec.Descriptor) error {
// verify while copying
vr := content.NewVerifyReader(src, desc)
if _, err := io.CopyBuffer(dst, vr, buf); err != nil {
return fmt.Errorf("copy failed: %w", err)
}
return vr.Verify()
}
// nopCloserType is the type of `io.NopCloser()`.
var nopCloserType = reflect.TypeOf(io.NopCloser(nil))
// UnwrapNopCloser unwraps the reader wrapped by `io.NopCloser()`.
// Similar implementation can be found in the built-in package `net/http`.
// Reference: https://github.com/golang/go/blob/go1.17.6/src/net/http/transfer.go#L423-L425
func UnwrapNopCloser(rc io.Reader) io.Reader {
if reflect.TypeOf(rc) == nopCloserType {
return reflect.ValueOf(rc).Field(0).Interface().(io.Reader)
}
return rc
}

View File

@@ -0,0 +1,136 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package platform
import (
"context"
"encoding/json"
"fmt"
"io"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/docker"
)
// Match checks whether the current platform matches the target platform.
// Match will return true if all of the following conditions are met.
// - Architecture and OS exactly match.
// - Variant and OSVersion exactly match if target platform provided.
// - OSFeatures of the target platform are the subsets of the OSFeatures
// array of the current platform.
//
// Note: Variant, OSVersion and OSFeatures are optional fields, will skip
// the comparison if the target platform does not provide specfic value.
func Match(got *ocispec.Platform, want *ocispec.Platform) bool {
if got.Architecture != want.Architecture || got.OS != want.OS {
return false
}
if want.OSVersion != "" && got.OSVersion != want.OSVersion {
return false
}
if want.Variant != "" && got.Variant != want.Variant {
return false
}
if len(want.OSFeatures) != 0 && !isSubset(want.OSFeatures, got.OSFeatures) {
return false
}
return true
}
// isSubset returns true if all items in slice A are present in slice B.
func isSubset(a, b []string) bool {
set := make(map[string]bool, len(b))
for _, v := range b {
set[v] = true
}
for _, v := range a {
if _, ok := set[v]; !ok {
return false
}
}
return true
}
// SelectManifest implements platform filter and returns the descriptor of the
// first matched manifest if the root is a manifest list. If the root is a
// manifest, then return the root descriptor if platform matches.
func SelectManifest(ctx context.Context, src content.ReadOnlyStorage, root ocispec.Descriptor, p *ocispec.Platform) (ocispec.Descriptor, error) {
switch root.MediaType {
case docker.MediaTypeManifestList, ocispec.MediaTypeImageIndex:
manifests, err := content.Successors(ctx, src, root)
if err != nil {
return ocispec.Descriptor{}, err
}
// platform filter
for _, m := range manifests {
if Match(m.Platform, p) {
return m, nil
}
}
return ocispec.Descriptor{}, fmt.Errorf("%s: %w: no matching manifest was found in the manifest list", root.Digest, errdef.ErrNotFound)
case docker.MediaTypeManifest, ocispec.MediaTypeImageManifest:
descs, err := content.Successors(ctx, src, root)
if err != nil {
return ocispec.Descriptor{}, err
}
configMediaType := docker.MediaTypeConfig
if root.MediaType == ocispec.MediaTypeImageManifest {
configMediaType = ocispec.MediaTypeImageConfig
}
cfgPlatform, err := getPlatformFromConfig(ctx, src, descs[0], configMediaType)
if err != nil {
return ocispec.Descriptor{}, err
}
if Match(cfgPlatform, p) {
return root, nil
}
return ocispec.Descriptor{}, fmt.Errorf("%s: %w: platform in manifest does not match target platform", root.Digest, errdef.ErrNotFound)
default:
return ocispec.Descriptor{}, fmt.Errorf("%s: %s: %w", root.Digest, root.MediaType, errdef.ErrUnsupported)
}
}
// getPlatformFromConfig returns a platform object which is made up from the
// fields in config blob.
func getPlatformFromConfig(ctx context.Context, src content.ReadOnlyStorage, desc ocispec.Descriptor, targetConfigMediaType string) (*ocispec.Platform, error) {
if desc.MediaType != targetConfigMediaType {
return nil, fmt.Errorf("fail to recognize platform from unknown config %s: expect %s", desc.MediaType, targetConfigMediaType)
}
rc, err := src.Fetch(ctx, desc)
if err != nil {
return nil, err
}
defer rc.Close()
var platform ocispec.Platform
if err = json.NewDecoder(rc).Decode(&platform); err != nil && err != io.EOF {
return nil, err
}
return &platform, nil
}

View File

@@ -0,0 +1,29 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registryutil
import (
"context"
"oras.land/oras-go/v2/registry"
"oras.land/oras-go/v2/registry/remote/auth"
)
// WithScopeHint adds a hinted scope to the context.
func WithScopeHint(ctx context.Context, ref registry.Reference, actions ...string) context.Context {
scope := auth.ScopeRepository(ref.Repository, actions...)
return auth.AppendScopes(ctx, scope)
}

View File

@@ -0,0 +1,102 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registryutil
import (
"context"
"io"
"sync"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/internal/cas"
"oras.land/oras-go/v2/internal/ioutil"
"oras.land/oras-go/v2/registry"
)
// ReferenceStorage represents a CAS that supports registry.ReferenceFetcher.
type ReferenceStorage interface {
content.ReadOnlyStorage
registry.ReferenceFetcher
}
// Proxy is a caching proxy dedicated for registry.ReferenceFetcher.
// The first fetch call of a described content will read from the remote and
// cache the fetched content.
// The subsequent fetch call will read from the local cache.
type Proxy struct {
registry.ReferenceFetcher
*cas.Proxy
}
// NewProxy creates a proxy for the `base` ReferenceStorage, using the `cache`
// storage as the cache.
func NewProxy(base ReferenceStorage, cache content.Storage) *Proxy {
return &Proxy{
ReferenceFetcher: base,
Proxy: cas.NewProxy(base, cache),
}
}
// FetchReference fetches the content identified by the reference from the
// remote and cache the fetched content.
func (p *Proxy) FetchReference(ctx context.Context, reference string) (ocispec.Descriptor, io.ReadCloser, error) {
target, rc, err := p.ReferenceFetcher.FetchReference(ctx, reference)
if err != nil {
return ocispec.Descriptor{}, nil, err
}
// skip caching if the content already exists in cache
exists, err := p.Cache.Exists(ctx, target)
if err != nil {
return ocispec.Descriptor{}, nil, err
}
if exists {
return target, rc, nil
}
// cache content while reading
pr, pw := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
var pushErr error
go func() {
defer wg.Done()
pushErr = p.Cache.Push(ctx, target, pr)
if pushErr != nil {
pr.CloseWithError(pushErr)
}
}()
closer := ioutil.CloserFunc(func() error {
rcErr := rc.Close()
if err := pw.Close(); err != nil {
return err
}
wg.Wait()
if pushErr != nil {
return pushErr
}
return rcErr
})
return target, struct {
io.Reader
io.Closer
}{
Reader: io.TeeReader(rc, pw),
Closer: closer,
}, nil
}

View File

@@ -0,0 +1,61 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resolver
import (
"context"
"sync"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/errdef"
)
// Memory is a memory based resolver.
type Memory struct {
index sync.Map // map[string]ocispec.Descriptor
}
// NewMemory creates a new Memory resolver.
func NewMemory() *Memory {
return &Memory{}
}
// Resolve resolves a reference to a descriptor.
func (m *Memory) Resolve(_ context.Context, reference string) (ocispec.Descriptor, error) {
desc, ok := m.index.Load(reference)
if !ok {
return ocispec.Descriptor{}, errdef.ErrNotFound
}
return desc.(ocispec.Descriptor), nil
}
// Tag tags a descriptor with a reference string.
func (m *Memory) Tag(_ context.Context, desc ocispec.Descriptor, reference string) error {
m.index.Store(reference, desc)
return nil
}
// Map dumps the memory into a built-in map structure.
// Like other operations, calling Map() is go-routine safe. However, it does not
// necessarily correspond to any consistent snapshot of the storage contents.
func (m *Memory) Map() map[string]ocispec.Descriptor {
res := make(map[string]ocispec.Descriptor)
m.index.Range(func(key, value interface{}) bool {
res[key.(string)] = value.(ocispec.Descriptor)
return true
})
return res
}

View File

@@ -0,0 +1,24 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package slices
// Clone returns a shallow copy of the slice.
func Clone[S ~[]E, E any](s S) S {
if s == nil {
return nil
}
return append(make(S, 0, len(s)), s...)
}

View File

@@ -0,0 +1,43 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
"sync"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/internal/descriptor"
)
// Tracker tracks content status described by a descriptor.
type Tracker struct {
status sync.Map // map[descriptor.Descriptor]chan struct{}
}
// NewTracker creates a new content status tracker.
func NewTracker() *Tracker {
return &Tracker{}
}
// TryCommit tries to commit the work for the target descriptor.
// Returns true if committed. A channel is also returned for sending
// notifications. Once the work is done, the channel should be closed.
// Returns false if the work is done or still in progress.
func (t *Tracker) TryCommit(target ocispec.Descriptor) (chan struct{}, bool) {
key := descriptor.FromOCI(target)
status, exists := t.status.LoadOrStore(key, make(chan struct{}))
return status.(chan struct{}), !exists
}

View File

@@ -0,0 +1,84 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syncutil
import (
"context"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
// LimitedRegion provides a way to bound concurrent access to a code block.
type LimitedRegion struct {
ctx context.Context
limiter *semaphore.Weighted
ended bool
}
// LimitRegion creates a new LimitedRegion.
func LimitRegion(ctx context.Context, limiter *semaphore.Weighted) *LimitedRegion {
if limiter == nil {
return nil
}
return &LimitedRegion{
ctx: ctx,
limiter: limiter,
ended: true,
}
}
// Start starts the region with concurrency limit.
func (lr *LimitedRegion) Start() error {
if lr == nil || !lr.ended {
return nil
}
if err := lr.limiter.Acquire(lr.ctx, 1); err != nil {
return err
}
lr.ended = false
return nil
}
// End ends the region with concurrency limit.
func (lr *LimitedRegion) End() {
if lr == nil || lr.ended {
return
}
lr.limiter.Release(1)
lr.ended = true
}
// GoFunc represents a function that can be invoked by Go.
type GoFunc[T any] func(ctx context.Context, region *LimitedRegion, t T) error
// Go concurrently invokes fn on items.
func Go[T any](ctx context.Context, limiter *semaphore.Weighted, fn GoFunc[T], items ...T) error {
eg, egCtx := errgroup.WithContext(ctx)
for _, item := range items {
region := LimitRegion(ctx, limiter)
if err := region.Start(); err != nil {
return err
}
eg.Go(func(t T) func() error {
return func() error {
defer region.End()
return fn(egCtx, region, t)
}
}(item))
}
return eg.Wait()
}

View File

@@ -0,0 +1,67 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syncutil
import (
"context"
"golang.org/x/sync/errgroup"
)
// A LimitedGroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
type LimitedGroup struct {
grp *errgroup.Group
ctx context.Context
}
// LimitGroup returns a new LimitedGroup and an associated Context derived from ctx.
//
// The number of active goroutines in this group is limited to the given limit.
// A negative value indicates no limit.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func LimitGroup(ctx context.Context, limit int) (*LimitedGroup, context.Context) {
grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(limit)
return &LimitedGroup{grp: grp, ctx: ctx}, ctx
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context.
// After which, any subsequent calls to Go will not execute their given function.
// The error will be returned by Wait.
func (g *LimitedGroup) Go(f func() error) {
g.grp.Go(func() error {
select {
case <-g.ctx.Done():
return g.ctx.Err()
default:
return f()
}
})
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *LimitedGroup) Wait() error {
return g.grp.Wait()
}

View File

@@ -0,0 +1,140 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syncutil
import "sync"
// mergeStatus represents the merge status of an item.
type mergeStatus struct {
// main indicates if items are being merged by the current go-routine.
main bool
// err represents the error of the merge operation.
err error
}
// Merge represents merge operations on items.
// The state transfer is shown as below:
//
// +----------+
// | Start +--------+-------------+
// +----+-----+ | |
// | | |
// v v v
// +----+-----+ +----+----+ +----+----+
// +-------+ Prepare +<--+ Pending +-->+ Waiting |
// | +----+-----+ +---------+ +----+----+
// | | |
// | v |
// | + ---+---- + |
// On Error | Resolve | |
// | + ---+---- + |
// | | |
// | v |
// | +----+-----+ |
// +------>+ Complete +<---------------------+
// +----+-----+
// |
// v
// +----+-----+
// | End |
// +----------+
type Merge[T any] struct {
lock sync.Mutex
committed bool
items []T
status chan mergeStatus
pending []T
pendingStatus chan mergeStatus
}
// Do merges concurrent operations of items into a single call of prepare and
// resolve.
// If Do is called multiple times concurrently, only one of the calls will be
// selected to invoke prepare and resolve.
func (m *Merge[T]) Do(item T, prepare func() error, resolve func(items []T) error) error {
status := <-m.assign(item)
if status.main {
err := prepare()
items := m.commit()
if err == nil {
err = resolve(items)
}
m.complete(err)
return err
}
return status.err
}
// assign adds a new item into the item list.
func (m *Merge[T]) assign(item T) <-chan mergeStatus {
m.lock.Lock()
defer m.lock.Unlock()
if m.committed {
if m.pendingStatus == nil {
m.pendingStatus = make(chan mergeStatus, 1)
}
m.pending = append(m.pending, item)
return m.pendingStatus
}
if m.status == nil {
m.status = make(chan mergeStatus, 1)
m.status <- mergeStatus{main: true}
}
m.items = append(m.items, item)
return m.status
}
// commit closes the assignment window, and the assigned items will be ready
// for resolve.
func (m *Merge[T]) commit() []T {
m.lock.Lock()
defer m.lock.Unlock()
m.committed = true
return m.items
}
// complete completes the previous merge, and moves the pending items to the
// stage for the next merge.
func (m *Merge[T]) complete(err error) {
// notify results
if err == nil {
close(m.status)
} else {
remaining := len(m.items) - 1
status := m.status
for remaining > 0 {
status <- mergeStatus{err: err}
remaining--
}
}
// move pending items to the stage
m.lock.Lock()
defer m.lock.Unlock()
m.committed = false
m.items = m.pending
m.status = m.pendingStatus
m.pending = nil
m.pendingStatus = nil
if m.status != nil {
m.status <- mergeStatus{main: true}
}
}

View File

@@ -0,0 +1,70 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syncutil
import "context"
// Once is an object that will perform exactly one action.
// Unlike sync.Once, this Once allowes the action to have return values.
type Once struct {
result interface{}
err error
status chan bool
}
// NewOnce creates a new Once instance.
func NewOnce() *Once {
status := make(chan bool, 1)
status <- true
return &Once{
status: status,
}
}
// Do calls the function f if and only if Do is being called first time or all
// previous function calls are cancelled, deadline exceeded, or panicking.
// When `once.Do(ctx, f)` is called multiple times, the return value of the
// first call of the function f is stored, and is directly returned for other
// calls.
// Besides the return value of the function f, including the error, Do returns
// true if the function f passed is called first and is not cancelled, deadline
// exceeded, or panicking. Otherwise, returns false.
func (o *Once) Do(ctx context.Context, f func() (interface{}, error)) (bool, interface{}, error) {
defer func() {
if r := recover(); r != nil {
o.status <- true
panic(r)
}
}()
for {
select {
case inProgress := <-o.status:
if !inProgress {
return false, o.result, o.err
}
result, err := f()
if err == context.Canceled || err == context.DeadlineExceeded {
o.status <- true
return false, nil, err
}
o.result, o.err = result, err
close(o.status)
return true, result, err
case <-ctx.Done():
return false, nil, ctx.Err()
}
}
}

View File

@@ -0,0 +1,64 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syncutil
import "sync"
// poolItem represents an item in Pool.
type poolItem[T any] struct {
value T
refCount int
}
// Pool is a scalable pool with items identified by keys.
type Pool[T any] struct {
// New optionally specifies a function to generate a value when Get would
// otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() T
lock sync.Mutex
items map[any]*poolItem[T]
}
// Get gets the value identified by key.
// The caller should invoke the returned function after using the returned item.
func (p *Pool[T]) Get(key any) (*T, func()) {
p.lock.Lock()
defer p.lock.Unlock()
item, ok := p.items[key]
if !ok {
if p.items == nil {
p.items = make(map[any]*poolItem[T])
}
item = &poolItem[T]{}
if p.New != nil {
item.value = p.New()
}
p.items[key] = item
}
item.refCount++
return &item.value, func() {
p.lock.Lock()
defer p.lock.Unlock()
item.refCount--
if item.refCount <= 0 {
delete(p.items, key)
}
}
}