WIP
This commit is contained in:
144
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/debug.go
generated
vendored
Normal file
144
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/debug.go
generated
vendored
Normal file
@@ -0,0 +1,144 @@
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type decodedMessage struct {
|
||||
rawMessage
|
||||
Headers decodedHeaders `json:"headers"`
|
||||
}
|
||||
type jsonMessage struct {
|
||||
Length json.Number `json:"total_length"`
|
||||
HeadersLen json.Number `json:"headers_length"`
|
||||
PreludeCRC json.Number `json:"prelude_crc"`
|
||||
Headers decodedHeaders `json:"headers"`
|
||||
Payload []byte `json:"payload"`
|
||||
CRC json.Number `json:"message_crc"`
|
||||
}
|
||||
|
||||
func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) {
|
||||
var jsonMsg jsonMessage
|
||||
if err = json.Unmarshal(b, &jsonMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.Length, err = numAsUint32(jsonMsg.Length)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Headers = jsonMsg.Headers
|
||||
d.Payload = jsonMsg.Payload
|
||||
d.CRC, err = numAsUint32(jsonMsg.CRC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *decodedMessage) MarshalJSON() ([]byte, error) {
|
||||
jsonMsg := jsonMessage{
|
||||
Length: json.Number(strconv.Itoa(int(d.Length))),
|
||||
HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))),
|
||||
PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))),
|
||||
Headers: d.Headers,
|
||||
Payload: d.Payload,
|
||||
CRC: json.Number(strconv.Itoa(int(d.CRC))),
|
||||
}
|
||||
|
||||
return json.Marshal(jsonMsg)
|
||||
}
|
||||
|
||||
func numAsUint32(n json.Number) (uint32, error) {
|
||||
v, err := n.Int64()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get int64 json number, %v", err)
|
||||
}
|
||||
|
||||
return uint32(v), nil
|
||||
}
|
||||
|
||||
func (d decodedMessage) Message() Message {
|
||||
return Message{
|
||||
Headers: Headers(d.Headers),
|
||||
Payload: d.Payload,
|
||||
}
|
||||
}
|
||||
|
||||
type decodedHeaders Headers
|
||||
|
||||
func (hs *decodedHeaders) UnmarshalJSON(b []byte) error {
|
||||
var jsonHeaders []struct {
|
||||
Name string `json:"name"`
|
||||
Type valueType `json:"type"`
|
||||
Value interface{} `json:"value"`
|
||||
}
|
||||
|
||||
decoder := json.NewDecoder(bytes.NewReader(b))
|
||||
decoder.UseNumber()
|
||||
if err := decoder.Decode(&jsonHeaders); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var headers Headers
|
||||
for _, h := range jsonHeaders {
|
||||
value, err := valueFromType(h.Type, h.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
headers.Set(h.Name, value)
|
||||
}
|
||||
*hs = decodedHeaders(headers)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func valueFromType(typ valueType, val interface{}) (Value, error) {
|
||||
switch typ {
|
||||
case trueValueType:
|
||||
return BoolValue(true), nil
|
||||
case falseValueType:
|
||||
return BoolValue(false), nil
|
||||
case int8ValueType:
|
||||
v, err := val.(json.Number).Int64()
|
||||
return Int8Value(int8(v)), err
|
||||
case int16ValueType:
|
||||
v, err := val.(json.Number).Int64()
|
||||
return Int16Value(int16(v)), err
|
||||
case int32ValueType:
|
||||
v, err := val.(json.Number).Int64()
|
||||
return Int32Value(int32(v)), err
|
||||
case int64ValueType:
|
||||
v, err := val.(json.Number).Int64()
|
||||
return Int64Value(v), err
|
||||
case bytesValueType:
|
||||
v, err := base64.StdEncoding.DecodeString(val.(string))
|
||||
return BytesValue(v), err
|
||||
case stringValueType:
|
||||
v, err := base64.StdEncoding.DecodeString(val.(string))
|
||||
return StringValue(string(v)), err
|
||||
case timestampValueType:
|
||||
v, err := val.(json.Number).Int64()
|
||||
return TimestampValue(timeFromEpochMilli(v)), err
|
||||
case uuidValueType:
|
||||
v, err := base64.StdEncoding.DecodeString(val.(string))
|
||||
var tv UUIDValue
|
||||
copy(tv[:], v)
|
||||
return tv, err
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val))
|
||||
}
|
||||
}
|
||||
216
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go
generated
vendored
Normal file
216
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go
generated
vendored
Normal file
@@ -0,0 +1,216 @@
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
)
|
||||
|
||||
// Decoder provides decoding of an Event Stream messages.
|
||||
type Decoder struct {
|
||||
r io.Reader
|
||||
logger aws.Logger
|
||||
}
|
||||
|
||||
// NewDecoder initializes and returns a Decoder for decoding event
|
||||
// stream messages from the reader provided.
|
||||
func NewDecoder(r io.Reader, opts ...func(*Decoder)) *Decoder {
|
||||
d := &Decoder{
|
||||
r: r,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(d)
|
||||
}
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
// DecodeWithLogger adds a logger to be used by the decoder when decoding
|
||||
// stream events.
|
||||
func DecodeWithLogger(logger aws.Logger) func(*Decoder) {
|
||||
return func(d *Decoder) {
|
||||
d.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
// Decode attempts to decode a single message from the event stream reader.
|
||||
// Will return the event stream message, or error if Decode fails to read
|
||||
// the message from the stream.
|
||||
func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) {
|
||||
reader := d.r
|
||||
if d.logger != nil {
|
||||
debugMsgBuf := bytes.NewBuffer(nil)
|
||||
reader = io.TeeReader(reader, debugMsgBuf)
|
||||
defer func() {
|
||||
logMessageDecode(d.logger, debugMsgBuf, m, err)
|
||||
}()
|
||||
}
|
||||
|
||||
m, err = Decode(reader, payloadBuf)
|
||||
|
||||
return m, err
|
||||
}
|
||||
|
||||
// Decode attempts to decode a single message from the event stream reader.
|
||||
// Will return the event stream message, or error if Decode fails to read
|
||||
// the message from the reader.
|
||||
func Decode(reader io.Reader, payloadBuf []byte) (m Message, err error) {
|
||||
crc := crc32.New(crc32IEEETable)
|
||||
hashReader := io.TeeReader(reader, crc)
|
||||
|
||||
prelude, err := decodePrelude(hashReader, crc)
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
|
||||
if prelude.HeadersLen > 0 {
|
||||
lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
|
||||
m.Headers, err = decodeHeaders(lr)
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
|
||||
buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
m.Payload = buf
|
||||
}
|
||||
|
||||
msgCRC := crc.Sum32()
|
||||
if err := validateCRC(reader, msgCRC); err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
|
||||
w := bytes.NewBuffer(nil)
|
||||
defer func() { logger.Log(w.String()) }()
|
||||
|
||||
fmt.Fprintf(w, "Raw message:\n%s\n",
|
||||
hex.Dump(msgBuf.Bytes()))
|
||||
|
||||
if decodeErr != nil {
|
||||
fmt.Fprintf(w, "Decode error: %v\n", decodeErr)
|
||||
return
|
||||
}
|
||||
|
||||
rawMsg, err := msg.rawMessage()
|
||||
if err != nil {
|
||||
fmt.Fprintf(w, "failed to create raw message, %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
decodedMsg := decodedMessage{
|
||||
rawMessage: rawMsg,
|
||||
Headers: decodedHeaders(msg.Headers),
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "Decoded message:\n")
|
||||
encoder := json.NewEncoder(w)
|
||||
if err := encoder.Encode(decodedMsg); err != nil {
|
||||
fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
|
||||
var p messagePrelude
|
||||
|
||||
var err error
|
||||
p.Length, err = decodeUint32(r)
|
||||
if err != nil {
|
||||
return messagePrelude{}, err
|
||||
}
|
||||
|
||||
p.HeadersLen, err = decodeUint32(r)
|
||||
if err != nil {
|
||||
return messagePrelude{}, err
|
||||
}
|
||||
|
||||
if err := p.ValidateLens(); err != nil {
|
||||
return messagePrelude{}, err
|
||||
}
|
||||
|
||||
preludeCRC := crc.Sum32()
|
||||
if err := validateCRC(r, preludeCRC); err != nil {
|
||||
return messagePrelude{}, err
|
||||
}
|
||||
|
||||
p.PreludeCRC = preludeCRC
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
|
||||
w := bytes.NewBuffer(buf[0:0])
|
||||
|
||||
_, err := io.Copy(w, r)
|
||||
return w.Bytes(), err
|
||||
}
|
||||
|
||||
func decodeUint8(r io.Reader) (uint8, error) {
|
||||
type byteReader interface {
|
||||
ReadByte() (byte, error)
|
||||
}
|
||||
|
||||
if br, ok := r.(byteReader); ok {
|
||||
v, err := br.ReadByte()
|
||||
return uint8(v), err
|
||||
}
|
||||
|
||||
var b [1]byte
|
||||
_, err := io.ReadFull(r, b[:])
|
||||
return uint8(b[0]), err
|
||||
}
|
||||
func decodeUint16(r io.Reader) (uint16, error) {
|
||||
var b [2]byte
|
||||
bs := b[:]
|
||||
_, err := io.ReadFull(r, bs)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return binary.BigEndian.Uint16(bs), nil
|
||||
}
|
||||
func decodeUint32(r io.Reader) (uint32, error) {
|
||||
var b [4]byte
|
||||
bs := b[:]
|
||||
_, err := io.ReadFull(r, bs)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return binary.BigEndian.Uint32(bs), nil
|
||||
}
|
||||
func decodeUint64(r io.Reader) (uint64, error) {
|
||||
var b [8]byte
|
||||
bs := b[:]
|
||||
_, err := io.ReadFull(r, bs)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return binary.BigEndian.Uint64(bs), nil
|
||||
}
|
||||
|
||||
func validateCRC(r io.Reader, expect uint32) error {
|
||||
msgCRC, err := decodeUint32(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if msgCRC != expect {
|
||||
return ChecksumError{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
162
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/encode.go
generated
vendored
Normal file
162
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/encode.go
generated
vendored
Normal file
@@ -0,0 +1,162 @@
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
)
|
||||
|
||||
// Encoder provides EventStream message encoding.
|
||||
type Encoder struct {
|
||||
w io.Writer
|
||||
logger aws.Logger
|
||||
|
||||
headersBuf *bytes.Buffer
|
||||
}
|
||||
|
||||
// NewEncoder initializes and returns an Encoder to encode Event Stream
|
||||
// messages to an io.Writer.
|
||||
func NewEncoder(w io.Writer, opts ...func(*Encoder)) *Encoder {
|
||||
e := &Encoder{
|
||||
w: w,
|
||||
headersBuf: bytes.NewBuffer(nil),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(e)
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
// EncodeWithLogger adds a logger to be used by the encode when decoding
|
||||
// stream events.
|
||||
func EncodeWithLogger(logger aws.Logger) func(*Encoder) {
|
||||
return func(d *Encoder) {
|
||||
d.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
// Encode encodes a single EventStream message to the io.Writer the Encoder
|
||||
// was created with. An error is returned if writing the message fails.
|
||||
func (e *Encoder) Encode(msg Message) (err error) {
|
||||
e.headersBuf.Reset()
|
||||
|
||||
writer := e.w
|
||||
if e.logger != nil {
|
||||
encodeMsgBuf := bytes.NewBuffer(nil)
|
||||
writer = io.MultiWriter(writer, encodeMsgBuf)
|
||||
defer func() {
|
||||
logMessageEncode(e.logger, encodeMsgBuf, msg, err)
|
||||
}()
|
||||
}
|
||||
|
||||
if err = EncodeHeaders(e.headersBuf, msg.Headers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
crc := crc32.New(crc32IEEETable)
|
||||
hashWriter := io.MultiWriter(writer, crc)
|
||||
|
||||
headersLen := uint32(e.headersBuf.Len())
|
||||
payloadLen := uint32(len(msg.Payload))
|
||||
|
||||
if err = encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if headersLen > 0 {
|
||||
if _, err = io.Copy(hashWriter, e.headersBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if payloadLen > 0 {
|
||||
if _, err = hashWriter.Write(msg.Payload); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
msgCRC := crc.Sum32()
|
||||
return binary.Write(writer, binary.BigEndian, msgCRC)
|
||||
}
|
||||
|
||||
func logMessageEncode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, encodeErr error) {
|
||||
w := bytes.NewBuffer(nil)
|
||||
defer func() { logger.Log(w.String()) }()
|
||||
|
||||
fmt.Fprintf(w, "Message to encode:\n")
|
||||
encoder := json.NewEncoder(w)
|
||||
if err := encoder.Encode(msg); err != nil {
|
||||
fmt.Fprintf(w, "Failed to get encoded message, %v\n", err)
|
||||
}
|
||||
|
||||
if encodeErr != nil {
|
||||
fmt.Fprintf(w, "Encode error: %v\n", encodeErr)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "Raw message:\n%s\n", hex.Dump(msgBuf.Bytes()))
|
||||
}
|
||||
|
||||
func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error {
|
||||
p := messagePrelude{
|
||||
Length: minMsgLen + headersLen + payloadLen,
|
||||
HeadersLen: headersLen,
|
||||
}
|
||||
if err := p.ValidateLens(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := binaryWriteFields(w, binary.BigEndian,
|
||||
p.Length,
|
||||
p.HeadersLen,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.PreludeCRC = crc.Sum32()
|
||||
err = binary.Write(w, binary.BigEndian, p.PreludeCRC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EncodeHeaders writes the header values to the writer encoded in the event
|
||||
// stream format. Returns an error if a header fails to encode.
|
||||
func EncodeHeaders(w io.Writer, headers Headers) error {
|
||||
for _, h := range headers {
|
||||
hn := headerName{
|
||||
Len: uint8(len(h.Name)),
|
||||
}
|
||||
copy(hn.Name[:hn.Len], h.Name)
|
||||
if err := hn.encode(w); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := h.Value.encode(w); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error {
|
||||
for _, v := range vs {
|
||||
if err := binary.Write(w, order, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
23
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/error.go
generated
vendored
Normal file
23
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/error.go
generated
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
package eventstream
|
||||
|
||||
import "fmt"
|
||||
|
||||
// LengthError provides the error for items being larger than a maximum length.
|
||||
type LengthError struct {
|
||||
Part string
|
||||
Want int
|
||||
Have int
|
||||
Value interface{}
|
||||
}
|
||||
|
||||
func (e LengthError) Error() string {
|
||||
return fmt.Sprintf("%s length invalid, %d/%d, %v",
|
||||
e.Part, e.Want, e.Have, e.Value)
|
||||
}
|
||||
|
||||
// ChecksumError provides the error for message checksum invalidation errors.
|
||||
type ChecksumError struct{}
|
||||
|
||||
func (e ChecksumError) Error() string {
|
||||
return "message checksum mismatch"
|
||||
}
|
||||
77
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/error.go
generated
vendored
Normal file
77
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/error.go
generated
vendored
Normal file
@@ -0,0 +1,77 @@
|
||||
package eventstreamapi
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type messageError struct {
|
||||
code string
|
||||
msg string
|
||||
}
|
||||
|
||||
func (e messageError) Code() string {
|
||||
return e.code
|
||||
}
|
||||
|
||||
func (e messageError) Message() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func (e messageError) Error() string {
|
||||
return fmt.Sprintf("%s: %s", e.code, e.msg)
|
||||
}
|
||||
|
||||
func (e messageError) OrigErr() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnceError wraps the behavior of recording an error
|
||||
// once and signal on a channel when this has occurred.
|
||||
// Signaling is done by closing of the channel.
|
||||
//
|
||||
// Type is safe for concurrent usage.
|
||||
type OnceError struct {
|
||||
mu sync.RWMutex
|
||||
err error
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
// NewOnceError return a new OnceError
|
||||
func NewOnceError() *OnceError {
|
||||
return &OnceError{
|
||||
ch: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// Err acquires a read-lock and returns an
|
||||
// error if one has been set.
|
||||
func (e *OnceError) Err() error {
|
||||
e.mu.RLock()
|
||||
err := e.err
|
||||
e.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// SetError acquires a write-lock and will set
|
||||
// the underlying error value if one has not been set.
|
||||
func (e *OnceError) SetError(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
if e.err == nil {
|
||||
e.err = err
|
||||
close(e.ch)
|
||||
}
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
// ErrorSet returns a channel that will be used to signal
|
||||
// that an error has been set. This channel will be closed
|
||||
// when the error value has been set for OnceError.
|
||||
func (e *OnceError) ErrorSet() <-chan struct{} {
|
||||
return e.ch
|
||||
}
|
||||
160
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/reader.go
generated
vendored
Normal file
160
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/reader.go
generated
vendored
Normal file
@@ -0,0 +1,160 @@
|
||||
package eventstreamapi
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/aws/aws-sdk-go/private/protocol"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream"
|
||||
)
|
||||
|
||||
// Unmarshaler provides the interface for unmarshaling a EventStream
|
||||
// message into a SDK type.
|
||||
type Unmarshaler interface {
|
||||
UnmarshalEvent(protocol.PayloadUnmarshaler, eventstream.Message) error
|
||||
}
|
||||
|
||||
// EventReader provides reading from the EventStream of an reader.
|
||||
type EventReader struct {
|
||||
decoder *eventstream.Decoder
|
||||
|
||||
unmarshalerForEventType func(string) (Unmarshaler, error)
|
||||
payloadUnmarshaler protocol.PayloadUnmarshaler
|
||||
|
||||
payloadBuf []byte
|
||||
}
|
||||
|
||||
// NewEventReader returns a EventReader built from the reader and unmarshaler
|
||||
// provided. Use ReadStream method to start reading from the EventStream.
|
||||
func NewEventReader(
|
||||
decoder *eventstream.Decoder,
|
||||
payloadUnmarshaler protocol.PayloadUnmarshaler,
|
||||
unmarshalerForEventType func(string) (Unmarshaler, error),
|
||||
) *EventReader {
|
||||
return &EventReader{
|
||||
decoder: decoder,
|
||||
payloadUnmarshaler: payloadUnmarshaler,
|
||||
unmarshalerForEventType: unmarshalerForEventType,
|
||||
payloadBuf: make([]byte, 10*1024),
|
||||
}
|
||||
}
|
||||
|
||||
// ReadEvent attempts to read a message from the EventStream and return the
|
||||
// unmarshaled event value that the message is for.
|
||||
//
|
||||
// For EventStream API errors check if the returned error satisfies the
|
||||
// awserr.Error interface to get the error's Code and Message components.
|
||||
//
|
||||
// EventUnmarshalers called with EventStream messages must take copies of the
|
||||
// message's Payload. The payload will is reused between events read.
|
||||
func (r *EventReader) ReadEvent() (event interface{}, err error) {
|
||||
msg, err := r.decoder.Decode(r.payloadBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
// Reclaim payload buffer for next message read.
|
||||
r.payloadBuf = msg.Payload[0:0]
|
||||
}()
|
||||
|
||||
typ, err := GetHeaderString(msg, MessageTypeHeader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch typ {
|
||||
case EventMessageType:
|
||||
return r.unmarshalEventMessage(msg)
|
||||
case ExceptionMessageType:
|
||||
return nil, r.unmarshalEventException(msg)
|
||||
case ErrorMessageType:
|
||||
return nil, r.unmarshalErrorMessage(msg)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown eventstream message type, %v", typ)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *EventReader) unmarshalEventMessage(
|
||||
msg eventstream.Message,
|
||||
) (event interface{}, err error) {
|
||||
eventType, err := GetHeaderString(msg, EventTypeHeader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ev, err := r.unmarshalerForEventType(eventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
func (r *EventReader) unmarshalEventException(
|
||||
msg eventstream.Message,
|
||||
) (err error) {
|
||||
eventType, err := GetHeaderString(msg, ExceptionTypeHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ev, err := r.unmarshalerForEventType(eventType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
err, ok = ev.(error)
|
||||
if !ok {
|
||||
err = messageError{
|
||||
code: "SerializationError",
|
||||
msg: fmt.Sprintf(
|
||||
"event stream exception %s mapped to non-error %T, %v",
|
||||
eventType, ev, ev,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *EventReader) unmarshalErrorMessage(msg eventstream.Message) (err error) {
|
||||
var msgErr messageError
|
||||
|
||||
msgErr.code, err = GetHeaderString(msg, ErrorCodeHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgErr.msg, err = GetHeaderString(msg, ErrorMessageHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return msgErr
|
||||
}
|
||||
|
||||
// GetHeaderString returns the value of the header as a string. If the header
|
||||
// is not set or the value is not a string an error will be returned.
|
||||
func GetHeaderString(msg eventstream.Message, headerName string) (string, error) {
|
||||
headerVal := msg.Headers.Get(headerName)
|
||||
if headerVal == nil {
|
||||
return "", fmt.Errorf("error header %s not present", headerName)
|
||||
}
|
||||
|
||||
v, ok := headerVal.Get().(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("error header value is not a string, %T", headerVal)
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
23
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/shared.go
generated
vendored
Normal file
23
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/shared.go
generated
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
package eventstreamapi
|
||||
|
||||
// EventStream headers with specific meaning to async API functionality.
|
||||
const (
|
||||
ChunkSignatureHeader = `:chunk-signature` // chunk signature for message
|
||||
DateHeader = `:date` // Date header for signature
|
||||
|
||||
// Message header and values
|
||||
MessageTypeHeader = `:message-type` // Identifies type of message.
|
||||
EventMessageType = `event`
|
||||
ErrorMessageType = `error`
|
||||
ExceptionMessageType = `exception`
|
||||
|
||||
// Message Events
|
||||
EventTypeHeader = `:event-type` // Identifies message event type e.g. "Stats".
|
||||
|
||||
// Message Error
|
||||
ErrorCodeHeader = `:error-code`
|
||||
ErrorMessageHeader = `:error-message`
|
||||
|
||||
// Message Exception
|
||||
ExceptionTypeHeader = `:exception-type`
|
||||
)
|
||||
123
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/signer.go
generated
vendored
Normal file
123
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/signer.go
generated
vendored
Normal file
@@ -0,0 +1,123 @@
|
||||
package eventstreamapi
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream"
|
||||
)
|
||||
|
||||
var timeNow = time.Now
|
||||
|
||||
// StreamSigner defines an interface for the implementation of signing of event stream payloads
|
||||
type StreamSigner interface {
|
||||
GetSignature(headers, payload []byte, date time.Time) ([]byte, error)
|
||||
}
|
||||
|
||||
// SignEncoder envelopes event stream messages
|
||||
// into an event stream message payload with included
|
||||
// signature headers using the provided signer and encoder.
|
||||
type SignEncoder struct {
|
||||
signer StreamSigner
|
||||
encoder Encoder
|
||||
bufEncoder *BufferEncoder
|
||||
|
||||
closeErr error
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewSignEncoder returns a new SignEncoder using the provided stream signer and
|
||||
// event stream encoder.
|
||||
func NewSignEncoder(signer StreamSigner, encoder Encoder) *SignEncoder {
|
||||
// TODO: Need to pass down logging
|
||||
|
||||
return &SignEncoder{
|
||||
signer: signer,
|
||||
encoder: encoder,
|
||||
bufEncoder: NewBufferEncoder(),
|
||||
}
|
||||
}
|
||||
|
||||
// Close encodes a final event stream signing envelope with an empty event stream
|
||||
// payload. This final end-frame is used to mark the conclusion of the stream.
|
||||
func (s *SignEncoder) Close() error {
|
||||
if s.closed {
|
||||
return s.closeErr
|
||||
}
|
||||
|
||||
if err := s.encode([]byte{}); err != nil {
|
||||
if strings.Contains(err.Error(), "on closed pipe") {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.closeErr = err
|
||||
s.closed = true
|
||||
return s.closeErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Encode takes the provided message and add envelopes the message
|
||||
// with the required signature.
|
||||
func (s *SignEncoder) Encode(msg eventstream.Message) error {
|
||||
payload, err := s.bufEncoder.Encode(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.encode(payload)
|
||||
}
|
||||
|
||||
func (s SignEncoder) encode(payload []byte) error {
|
||||
date := timeNow()
|
||||
|
||||
var msg eventstream.Message
|
||||
msg.Headers.Set(DateHeader, eventstream.TimestampValue(date))
|
||||
msg.Payload = payload
|
||||
|
||||
var headers bytes.Buffer
|
||||
if err := eventstream.EncodeHeaders(&headers, msg.Headers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sig, err := s.signer.GetSignature(headers.Bytes(), msg.Payload, date)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msg.Headers.Set(ChunkSignatureHeader, eventstream.BytesValue(sig))
|
||||
|
||||
return s.encoder.Encode(msg)
|
||||
}
|
||||
|
||||
// BufferEncoder is a utility that provides a buffered
|
||||
// event stream encoder
|
||||
type BufferEncoder struct {
|
||||
encoder Encoder
|
||||
buffer *bytes.Buffer
|
||||
}
|
||||
|
||||
// NewBufferEncoder returns a new BufferEncoder initialized
|
||||
// with a 1024 byte buffer.
|
||||
func NewBufferEncoder() *BufferEncoder {
|
||||
buf := bytes.NewBuffer(make([]byte, 1024))
|
||||
return &BufferEncoder{
|
||||
encoder: eventstream.NewEncoder(buf),
|
||||
buffer: buf,
|
||||
}
|
||||
}
|
||||
|
||||
// Encode returns the encoded message as a byte slice.
|
||||
// The returned byte slice will be modified on the next encode call
|
||||
// and should not be held onto.
|
||||
func (e *BufferEncoder) Encode(msg eventstream.Message) ([]byte, error) {
|
||||
e.buffer.Reset()
|
||||
|
||||
if err := e.encoder.Encode(msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return e.buffer.Bytes(), nil
|
||||
}
|
||||
129
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/stream_writer.go
generated
vendored
Normal file
129
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/stream_writer.go
generated
vendored
Normal file
@@ -0,0 +1,129 @@
|
||||
package eventstreamapi
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
)
|
||||
|
||||
// StreamWriter provides concurrent safe writing to an event stream.
|
||||
type StreamWriter struct {
|
||||
eventWriter *EventWriter
|
||||
stream chan eventWriteAsyncReport
|
||||
|
||||
done chan struct{}
|
||||
closeOnce sync.Once
|
||||
err *OnceError
|
||||
|
||||
streamCloser io.Closer
|
||||
}
|
||||
|
||||
// NewStreamWriter returns a StreamWriter for the event writer, and stream
|
||||
// closer provided.
|
||||
func NewStreamWriter(eventWriter *EventWriter, streamCloser io.Closer) *StreamWriter {
|
||||
w := &StreamWriter{
|
||||
eventWriter: eventWriter,
|
||||
streamCloser: streamCloser,
|
||||
stream: make(chan eventWriteAsyncReport),
|
||||
done: make(chan struct{}),
|
||||
err: NewOnceError(),
|
||||
}
|
||||
go w.writeStream()
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
// Close terminates the writers ability to write new events to the stream. Any
|
||||
// future call to Send will fail with an error.
|
||||
func (w *StreamWriter) Close() error {
|
||||
w.closeOnce.Do(w.safeClose)
|
||||
return w.Err()
|
||||
}
|
||||
|
||||
func (w *StreamWriter) safeClose() {
|
||||
close(w.done)
|
||||
}
|
||||
|
||||
// ErrorSet returns a channel which will be closed
|
||||
// if an error occurs.
|
||||
func (w *StreamWriter) ErrorSet() <-chan struct{} {
|
||||
return w.err.ErrorSet()
|
||||
}
|
||||
|
||||
// Err returns any error that occurred while attempting to write an event to the
|
||||
// stream.
|
||||
func (w *StreamWriter) Err() error {
|
||||
return w.err.Err()
|
||||
}
|
||||
|
||||
// Send writes a single event to the stream returning an error if the write
|
||||
// failed.
|
||||
//
|
||||
// Send may be called concurrently. Events will be written to the stream
|
||||
// safely.
|
||||
func (w *StreamWriter) Send(ctx aws.Context, event Marshaler) error {
|
||||
if err := w.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resultCh := make(chan error)
|
||||
wrapped := eventWriteAsyncReport{
|
||||
Event: event,
|
||||
Result: resultCh,
|
||||
}
|
||||
|
||||
select {
|
||||
case w.stream <- wrapped:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-w.done:
|
||||
return fmt.Errorf("stream closed, unable to send event")
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-resultCh:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-w.done:
|
||||
return fmt.Errorf("stream closed, unable to send event")
|
||||
}
|
||||
}
|
||||
|
||||
func (w *StreamWriter) writeStream() {
|
||||
defer w.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case wrapper := <-w.stream:
|
||||
err := w.eventWriter.WriteEvent(wrapper.Event)
|
||||
wrapper.ReportResult(w.done, err)
|
||||
if err != nil {
|
||||
w.err.SetError(err)
|
||||
return
|
||||
}
|
||||
|
||||
case <-w.done:
|
||||
if err := w.streamCloser.Close(); err != nil {
|
||||
w.err.SetError(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type eventWriteAsyncReport struct {
|
||||
Event Marshaler
|
||||
Result chan<- error
|
||||
}
|
||||
|
||||
func (e eventWriteAsyncReport) ReportResult(cancel <-chan struct{}, err error) bool {
|
||||
select {
|
||||
case e.Result <- err:
|
||||
return true
|
||||
case <-cancel:
|
||||
return false
|
||||
}
|
||||
}
|
||||
109
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/writer.go
generated
vendored
Normal file
109
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/writer.go
generated
vendored
Normal file
@@ -0,0 +1,109 @@
|
||||
package eventstreamapi
|
||||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go/private/protocol"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream"
|
||||
)
|
||||
|
||||
// Marshaler provides a marshaling interface for event types to event stream
|
||||
// messages.
|
||||
type Marshaler interface {
|
||||
MarshalEvent(protocol.PayloadMarshaler) (eventstream.Message, error)
|
||||
}
|
||||
|
||||
// Encoder is an stream encoder that will encode an event stream message for
|
||||
// the transport.
|
||||
type Encoder interface {
|
||||
Encode(eventstream.Message) error
|
||||
}
|
||||
|
||||
// EventWriter provides a wrapper around the underlying event stream encoder
|
||||
// for an io.WriteCloser.
|
||||
type EventWriter struct {
|
||||
encoder Encoder
|
||||
payloadMarshaler protocol.PayloadMarshaler
|
||||
eventTypeFor func(Marshaler) (string, error)
|
||||
}
|
||||
|
||||
// NewEventWriter returns a new event stream writer, that will write to the
|
||||
// writer provided. Use the WriteEvent method to write an event to the stream.
|
||||
func NewEventWriter(encoder Encoder, pm protocol.PayloadMarshaler, eventTypeFor func(Marshaler) (string, error),
|
||||
) *EventWriter {
|
||||
return &EventWriter{
|
||||
encoder: encoder,
|
||||
payloadMarshaler: pm,
|
||||
eventTypeFor: eventTypeFor,
|
||||
}
|
||||
}
|
||||
|
||||
// WriteEvent writes an event to the stream. Returns an error if the event
|
||||
// fails to marshal into a message, or writing to the underlying writer fails.
|
||||
func (w *EventWriter) WriteEvent(event Marshaler) error {
|
||||
msg, err := w.marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.encoder.Encode(msg)
|
||||
}
|
||||
|
||||
func (w *EventWriter) marshal(event Marshaler) (eventstream.Message, error) {
|
||||
eventType, err := w.eventTypeFor(event)
|
||||
if err != nil {
|
||||
return eventstream.Message{}, err
|
||||
}
|
||||
|
||||
msg, err := event.MarshalEvent(w.payloadMarshaler)
|
||||
if err != nil {
|
||||
return eventstream.Message{}, err
|
||||
}
|
||||
|
||||
msg.Headers.Set(EventTypeHeader, eventstream.StringValue(eventType))
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
//type EventEncoder struct {
|
||||
// encoder Encoder
|
||||
// ppayloadMarshaler protocol.PayloadMarshaler
|
||||
// eventTypeFor func(Marshaler) (string, error)
|
||||
//}
|
||||
//
|
||||
//func (e EventEncoder) Encode(event Marshaler) error {
|
||||
// msg, err := e.marshal(event)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// return w.encoder.Encode(msg)
|
||||
//}
|
||||
//
|
||||
//func (e EventEncoder) marshal(event Marshaler) (eventstream.Message, error) {
|
||||
// eventType, err := w.eventTypeFor(event)
|
||||
// if err != nil {
|
||||
// return eventstream.Message{}, err
|
||||
// }
|
||||
//
|
||||
// msg, err := event.MarshalEvent(w.payloadMarshaler)
|
||||
// if err != nil {
|
||||
// return eventstream.Message{}, err
|
||||
// }
|
||||
//
|
||||
// msg.Headers.Set(EventTypeHeader, eventstream.StringValue(eventType))
|
||||
// return msg, nil
|
||||
//}
|
||||
//
|
||||
//func (w *EventWriter) marshal(event Marshaler) (eventstream.Message, error) {
|
||||
// eventType, err := w.eventTypeFor(event)
|
||||
// if err != nil {
|
||||
// return eventstream.Message{}, err
|
||||
// }
|
||||
//
|
||||
// msg, err := event.MarshalEvent(w.payloadMarshaler)
|
||||
// if err != nil {
|
||||
// return eventstream.Message{}, err
|
||||
// }
|
||||
//
|
||||
// msg.Headers.Set(EventTypeHeader, eventstream.StringValue(eventType))
|
||||
// return msg, nil
|
||||
//}
|
||||
//
|
||||
166
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/header.go
generated
vendored
Normal file
166
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/header.go
generated
vendored
Normal file
@@ -0,0 +1,166 @@
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Headers are a collection of EventStream header values.
|
||||
type Headers []Header
|
||||
|
||||
// Header is a single EventStream Key Value header pair.
|
||||
type Header struct {
|
||||
Name string
|
||||
Value Value
|
||||
}
|
||||
|
||||
// Set associates the name with a value. If the header name already exists in
|
||||
// the Headers the value will be replaced with the new one.
|
||||
func (hs *Headers) Set(name string, value Value) {
|
||||
var i int
|
||||
for ; i < len(*hs); i++ {
|
||||
if (*hs)[i].Name == name {
|
||||
(*hs)[i].Value = value
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
*hs = append(*hs, Header{
|
||||
Name: name, Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
// Get returns the Value associated with the header. Nil is returned if the
|
||||
// value does not exist.
|
||||
func (hs Headers) Get(name string) Value {
|
||||
for i := 0; i < len(hs); i++ {
|
||||
if h := hs[i]; h.Name == name {
|
||||
return h.Value
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Del deletes the value in the Headers if it exists.
|
||||
func (hs *Headers) Del(name string) {
|
||||
for i := 0; i < len(*hs); i++ {
|
||||
if (*hs)[i].Name == name {
|
||||
copy((*hs)[i:], (*hs)[i+1:])
|
||||
(*hs) = (*hs)[:len(*hs)-1]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func decodeHeaders(r io.Reader) (Headers, error) {
|
||||
hs := Headers{}
|
||||
|
||||
for {
|
||||
name, err := decodeHeaderName(r)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// EOF while getting header name means no more headers
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value, err := decodeHeaderValue(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hs.Set(name, value)
|
||||
}
|
||||
|
||||
return hs, nil
|
||||
}
|
||||
|
||||
func decodeHeaderName(r io.Reader) (string, error) {
|
||||
var n headerName
|
||||
|
||||
var err error
|
||||
n.Len, err = decodeUint8(r)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
name := n.Name[:n.Len]
|
||||
if _, err := io.ReadFull(r, name); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(name), nil
|
||||
}
|
||||
|
||||
func decodeHeaderValue(r io.Reader) (Value, error) {
|
||||
var raw rawValue
|
||||
|
||||
typ, err := decodeUint8(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
raw.Type = valueType(typ)
|
||||
|
||||
var v Value
|
||||
|
||||
switch raw.Type {
|
||||
case trueValueType:
|
||||
v = BoolValue(true)
|
||||
case falseValueType:
|
||||
v = BoolValue(false)
|
||||
case int8ValueType:
|
||||
var tv Int8Value
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case int16ValueType:
|
||||
var tv Int16Value
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case int32ValueType:
|
||||
var tv Int32Value
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case int64ValueType:
|
||||
var tv Int64Value
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case bytesValueType:
|
||||
var tv BytesValue
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case stringValueType:
|
||||
var tv StringValue
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case timestampValueType:
|
||||
var tv TimestampValue
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
case uuidValueType:
|
||||
var tv UUIDValue
|
||||
err = tv.decode(r)
|
||||
v = tv
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown value type %d", raw.Type))
|
||||
}
|
||||
|
||||
// Error could be EOF, let caller deal with it
|
||||
return v, err
|
||||
}
|
||||
|
||||
const maxHeaderNameLen = 255
|
||||
|
||||
type headerName struct {
|
||||
Len uint8
|
||||
Name [maxHeaderNameLen]byte
|
||||
}
|
||||
|
||||
func (v headerName) encode(w io.Writer) error {
|
||||
if err := binary.Write(w, binary.BigEndian, v.Len); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := w.Write(v.Name[:v.Len])
|
||||
return err
|
||||
}
|
||||
506
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/header_value.go
generated
vendored
Normal file
506
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/header_value.go
generated
vendored
Normal file
@@ -0,0 +1,506 @@
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
|
||||
|
||||
// valueType is the EventStream header value type.
|
||||
type valueType uint8
|
||||
|
||||
// Header value types
|
||||
const (
|
||||
trueValueType valueType = iota
|
||||
falseValueType
|
||||
int8ValueType // Byte
|
||||
int16ValueType // Short
|
||||
int32ValueType // Integer
|
||||
int64ValueType // Long
|
||||
bytesValueType
|
||||
stringValueType
|
||||
timestampValueType
|
||||
uuidValueType
|
||||
)
|
||||
|
||||
func (t valueType) String() string {
|
||||
switch t {
|
||||
case trueValueType:
|
||||
return "bool"
|
||||
case falseValueType:
|
||||
return "bool"
|
||||
case int8ValueType:
|
||||
return "int8"
|
||||
case int16ValueType:
|
||||
return "int16"
|
||||
case int32ValueType:
|
||||
return "int32"
|
||||
case int64ValueType:
|
||||
return "int64"
|
||||
case bytesValueType:
|
||||
return "byte_array"
|
||||
case stringValueType:
|
||||
return "string"
|
||||
case timestampValueType:
|
||||
return "timestamp"
|
||||
case uuidValueType:
|
||||
return "uuid"
|
||||
default:
|
||||
return fmt.Sprintf("unknown value type %d", uint8(t))
|
||||
}
|
||||
}
|
||||
|
||||
type rawValue struct {
|
||||
Type valueType
|
||||
Len uint16 // Only set for variable length slices
|
||||
Value []byte // byte representation of value, BigEndian encoding.
|
||||
}
|
||||
|
||||
func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
|
||||
return binaryWriteFields(w, binary.BigEndian,
|
||||
r.Type,
|
||||
v,
|
||||
)
|
||||
}
|
||||
|
||||
func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
|
||||
binary.Write(w, binary.BigEndian, r.Type)
|
||||
|
||||
_, err := w.Write(v)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
|
||||
if len(v) > maxHeaderValueLen {
|
||||
return LengthError{
|
||||
Part: "header value",
|
||||
Want: maxHeaderValueLen, Have: len(v),
|
||||
Value: v,
|
||||
}
|
||||
}
|
||||
r.Len = uint16(len(v))
|
||||
|
||||
err := binaryWriteFields(w, binary.BigEndian,
|
||||
r.Type,
|
||||
r.Len,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = w.Write(v)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r rawValue) encodeString(w io.Writer, v string) error {
|
||||
if len(v) > maxHeaderValueLen {
|
||||
return LengthError{
|
||||
Part: "header value",
|
||||
Want: maxHeaderValueLen, Have: len(v),
|
||||
Value: v,
|
||||
}
|
||||
}
|
||||
r.Len = uint16(len(v))
|
||||
|
||||
type stringWriter interface {
|
||||
WriteString(string) (int, error)
|
||||
}
|
||||
|
||||
err := binaryWriteFields(w, binary.BigEndian,
|
||||
r.Type,
|
||||
r.Len,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sw, ok := w.(stringWriter); ok {
|
||||
_, err = sw.WriteString(v)
|
||||
} else {
|
||||
_, err = w.Write([]byte(v))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func decodeFixedBytesValue(r io.Reader, buf []byte) error {
|
||||
_, err := io.ReadFull(r, buf)
|
||||
return err
|
||||
}
|
||||
|
||||
func decodeBytesValue(r io.Reader) ([]byte, error) {
|
||||
var raw rawValue
|
||||
var err error
|
||||
raw.Len, err = decodeUint16(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := make([]byte, raw.Len)
|
||||
_, err = io.ReadFull(r, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func decodeStringValue(r io.Reader) (string, error) {
|
||||
v, err := decodeBytesValue(r)
|
||||
return string(v), err
|
||||
}
|
||||
|
||||
// Value represents the abstract header value.
|
||||
type Value interface {
|
||||
Get() interface{}
|
||||
String() string
|
||||
valueType() valueType
|
||||
encode(io.Writer) error
|
||||
}
|
||||
|
||||
// An BoolValue provides eventstream encoding, and representation
|
||||
// of a Go bool value.
|
||||
type BoolValue bool
|
||||
|
||||
// Get returns the underlying type
|
||||
func (v BoolValue) Get() interface{} {
|
||||
return bool(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (v BoolValue) valueType() valueType {
|
||||
if v {
|
||||
return trueValueType
|
||||
}
|
||||
return falseValueType
|
||||
}
|
||||
|
||||
func (v BoolValue) String() string {
|
||||
return strconv.FormatBool(bool(v))
|
||||
}
|
||||
|
||||
// encode encodes the BoolValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v BoolValue) encode(w io.Writer) error {
|
||||
return binary.Write(w, binary.BigEndian, v.valueType())
|
||||
}
|
||||
|
||||
// An Int8Value provides eventstream encoding, and representation of a Go
|
||||
// int8 value.
|
||||
type Int8Value int8
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int8Value) Get() interface{} {
|
||||
return int8(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int8Value) valueType() valueType {
|
||||
return int8ValueType
|
||||
}
|
||||
|
||||
func (v Int8Value) String() string {
|
||||
return fmt.Sprintf("0x%02x", int8(v))
|
||||
}
|
||||
|
||||
// encode encodes the Int8Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int8Value) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
|
||||
return raw.encodeScalar(w, v)
|
||||
}
|
||||
|
||||
func (v *Int8Value) decode(r io.Reader) error {
|
||||
n, err := decodeUint8(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = Int8Value(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
// An Int16Value provides eventstream encoding, and representation of a Go
|
||||
// int16 value.
|
||||
type Int16Value int16
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int16Value) Get() interface{} {
|
||||
return int16(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int16Value) valueType() valueType {
|
||||
return int16ValueType
|
||||
}
|
||||
|
||||
func (v Int16Value) String() string {
|
||||
return fmt.Sprintf("0x%04x", int16(v))
|
||||
}
|
||||
|
||||
// encode encodes the Int16Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int16Value) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
return raw.encodeScalar(w, v)
|
||||
}
|
||||
|
||||
func (v *Int16Value) decode(r io.Reader) error {
|
||||
n, err := decodeUint16(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = Int16Value(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
// An Int32Value provides eventstream encoding, and representation of a Go
|
||||
// int32 value.
|
||||
type Int32Value int32
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int32Value) Get() interface{} {
|
||||
return int32(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int32Value) valueType() valueType {
|
||||
return int32ValueType
|
||||
}
|
||||
|
||||
func (v Int32Value) String() string {
|
||||
return fmt.Sprintf("0x%08x", int32(v))
|
||||
}
|
||||
|
||||
// encode encodes the Int32Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int32Value) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
return raw.encodeScalar(w, v)
|
||||
}
|
||||
|
||||
func (v *Int32Value) decode(r io.Reader) error {
|
||||
n, err := decodeUint32(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = Int32Value(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
// An Int64Value provides eventstream encoding, and representation of a Go
|
||||
// int64 value.
|
||||
type Int64Value int64
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int64Value) Get() interface{} {
|
||||
return int64(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int64Value) valueType() valueType {
|
||||
return int64ValueType
|
||||
}
|
||||
|
||||
func (v Int64Value) String() string {
|
||||
return fmt.Sprintf("0x%016x", int64(v))
|
||||
}
|
||||
|
||||
// encode encodes the Int64Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int64Value) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
return raw.encodeScalar(w, v)
|
||||
}
|
||||
|
||||
func (v *Int64Value) decode(r io.Reader) error {
|
||||
n, err := decodeUint64(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = Int64Value(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
// An BytesValue provides eventstream encoding, and representation of a Go
|
||||
// byte slice.
|
||||
type BytesValue []byte
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v BytesValue) Get() interface{} {
|
||||
return []byte(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (BytesValue) valueType() valueType {
|
||||
return bytesValueType
|
||||
}
|
||||
|
||||
func (v BytesValue) String() string {
|
||||
return base64.StdEncoding.EncodeToString([]byte(v))
|
||||
}
|
||||
|
||||
// encode encodes the BytesValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v BytesValue) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
|
||||
return raw.encodeBytes(w, []byte(v))
|
||||
}
|
||||
|
||||
func (v *BytesValue) decode(r io.Reader) error {
|
||||
buf, err := decodeBytesValue(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = BytesValue(buf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// An StringValue provides eventstream encoding, and representation of a Go
|
||||
// string.
|
||||
type StringValue string
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v StringValue) Get() interface{} {
|
||||
return string(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (StringValue) valueType() valueType {
|
||||
return stringValueType
|
||||
}
|
||||
|
||||
func (v StringValue) String() string {
|
||||
return string(v)
|
||||
}
|
||||
|
||||
// encode encodes the StringValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v StringValue) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
|
||||
return raw.encodeString(w, string(v))
|
||||
}
|
||||
|
||||
func (v *StringValue) decode(r io.Reader) error {
|
||||
s, err := decodeStringValue(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = StringValue(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
// An TimestampValue provides eventstream encoding, and representation of a Go
|
||||
// timestamp.
|
||||
type TimestampValue time.Time
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v TimestampValue) Get() interface{} {
|
||||
return time.Time(v)
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (TimestampValue) valueType() valueType {
|
||||
return timestampValueType
|
||||
}
|
||||
|
||||
func (v TimestampValue) epochMilli() int64 {
|
||||
nano := time.Time(v).UnixNano()
|
||||
msec := nano / int64(time.Millisecond)
|
||||
return msec
|
||||
}
|
||||
|
||||
func (v TimestampValue) String() string {
|
||||
msec := v.epochMilli()
|
||||
return strconv.FormatInt(msec, 10)
|
||||
}
|
||||
|
||||
// encode encodes the TimestampValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v TimestampValue) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
|
||||
msec := v.epochMilli()
|
||||
return raw.encodeScalar(w, msec)
|
||||
}
|
||||
|
||||
func (v *TimestampValue) decode(r io.Reader) error {
|
||||
n, err := decodeUint64(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*v = TimestampValue(timeFromEpochMilli(int64(n)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface
|
||||
func (v TimestampValue) MarshalJSON() ([]byte, error) {
|
||||
return []byte(v.String()), nil
|
||||
}
|
||||
|
||||
func timeFromEpochMilli(t int64) time.Time {
|
||||
secs := t / 1e3
|
||||
msec := t % 1e3
|
||||
return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
|
||||
}
|
||||
|
||||
// An UUIDValue provides eventstream encoding, and representation of a UUID
|
||||
// value.
|
||||
type UUIDValue [16]byte
|
||||
|
||||
// Get returns the underlying value.
|
||||
func (v UUIDValue) Get() interface{} {
|
||||
return v[:]
|
||||
}
|
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (UUIDValue) valueType() valueType {
|
||||
return uuidValueType
|
||||
}
|
||||
|
||||
func (v UUIDValue) String() string {
|
||||
return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:])
|
||||
}
|
||||
|
||||
// encode encodes the UUIDValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v UUIDValue) encode(w io.Writer) error {
|
||||
raw := rawValue{
|
||||
Type: v.valueType(),
|
||||
}
|
||||
|
||||
return raw.encodeFixedSlice(w, v[:])
|
||||
}
|
||||
|
||||
func (v *UUIDValue) decode(r io.Reader) error {
|
||||
tv := (*v)[:]
|
||||
return decodeFixedBytesValue(r, tv)
|
||||
}
|
||||
103
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/message.go
generated
vendored
Normal file
103
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/message.go
generated
vendored
Normal file
@@ -0,0 +1,103 @@
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"hash/crc32"
|
||||
)
|
||||
|
||||
const preludeLen = 8
|
||||
const preludeCRCLen = 4
|
||||
const msgCRCLen = 4
|
||||
const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen
|
||||
const maxPayloadLen = 1024 * 1024 * 16 // 16MB
|
||||
const maxHeadersLen = 1024 * 128 // 128KB
|
||||
const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen
|
||||
|
||||
var crc32IEEETable = crc32.MakeTable(crc32.IEEE)
|
||||
|
||||
// A Message provides the eventstream message representation.
|
||||
type Message struct {
|
||||
Headers Headers
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
func (m *Message) rawMessage() (rawMessage, error) {
|
||||
var raw rawMessage
|
||||
|
||||
if len(m.Headers) > 0 {
|
||||
var headers bytes.Buffer
|
||||
if err := EncodeHeaders(&headers, m.Headers); err != nil {
|
||||
return rawMessage{}, err
|
||||
}
|
||||
raw.Headers = headers.Bytes()
|
||||
raw.HeadersLen = uint32(len(raw.Headers))
|
||||
}
|
||||
|
||||
raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen
|
||||
|
||||
hash := crc32.New(crc32IEEETable)
|
||||
binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen)
|
||||
raw.PreludeCRC = hash.Sum32()
|
||||
|
||||
binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC)
|
||||
|
||||
if raw.HeadersLen > 0 {
|
||||
hash.Write(raw.Headers)
|
||||
}
|
||||
|
||||
// Read payload bytes and update hash for it as well.
|
||||
if len(m.Payload) > 0 {
|
||||
raw.Payload = m.Payload
|
||||
hash.Write(raw.Payload)
|
||||
}
|
||||
|
||||
raw.CRC = hash.Sum32()
|
||||
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
type messagePrelude struct {
|
||||
Length uint32
|
||||
HeadersLen uint32
|
||||
PreludeCRC uint32
|
||||
}
|
||||
|
||||
func (p messagePrelude) PayloadLen() uint32 {
|
||||
return p.Length - p.HeadersLen - minMsgLen
|
||||
}
|
||||
|
||||
func (p messagePrelude) ValidateLens() error {
|
||||
if p.Length == 0 || p.Length > maxMsgLen {
|
||||
return LengthError{
|
||||
Part: "message prelude",
|
||||
Want: maxMsgLen,
|
||||
Have: int(p.Length),
|
||||
}
|
||||
}
|
||||
if p.HeadersLen > maxHeadersLen {
|
||||
return LengthError{
|
||||
Part: "message headers",
|
||||
Want: maxHeadersLen,
|
||||
Have: int(p.HeadersLen),
|
||||
}
|
||||
}
|
||||
if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen {
|
||||
return LengthError{
|
||||
Part: "message payload",
|
||||
Want: maxPayloadLen,
|
||||
Have: int(payloadLen),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type rawMessage struct {
|
||||
messagePrelude
|
||||
|
||||
Headers []byte
|
||||
Payload []byte
|
||||
|
||||
CRC uint32
|
||||
}
|
||||
Reference in New Issue
Block a user