// Code generated by smithy-go-codegen DO NOT EDIT. package s3 import ( "context" "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream" "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi" "github.com/aws/aws-sdk-go-v2/service/s3/types" smithy "github.com/aws/smithy-go" "github.com/aws/smithy-go/middleware" smithysync "github.com/aws/smithy-go/sync" smithyhttp "github.com/aws/smithy-go/transport/http" "io" "io/ioutil" "sync" ) // SelectObjectContentEventStreamReader provides the interface for reading events // from a stream. // // The writer's Close method must allow multiple concurrent calls. type SelectObjectContentEventStreamReader interface { Events() <-chan types.SelectObjectContentEventStream Close() error Err() error } type selectObjectContentEventStream struct { stream chan types.SelectObjectContentEventStream decoder *eventstream.Decoder eventStream io.ReadCloser err *smithysync.OnceErr payloadBuf []byte done chan struct{} closeOnce sync.Once } func newSelectObjectContentEventStream(readCloser io.ReadCloser, decoder *eventstream.Decoder) *selectObjectContentEventStream { w := &selectObjectContentEventStream{ stream: make(chan types.SelectObjectContentEventStream), decoder: decoder, eventStream: readCloser, err: smithysync.NewOnceErr(), done: make(chan struct{}), payloadBuf: make([]byte, 10*1024), } go w.readEventStream() return w } func (r *selectObjectContentEventStream) Events() <-chan types.SelectObjectContentEventStream { return r.stream } func (r *selectObjectContentEventStream) readEventStream() { defer r.Close() defer close(r.stream) for { r.payloadBuf = r.payloadBuf[0:0] decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf) if err != nil { if err == io.EOF { return } select { case <-r.done: return default: r.err.SetError(err) return } } event, err := r.deserializeEventMessage(&decodedMessage) if err != nil { r.err.SetError(err) return } select { case r.stream <- event: case <-r.done: return } } } func (r *selectObjectContentEventStream) deserializeEventMessage(msg *eventstream.Message) (types.SelectObjectContentEventStream, error) { messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader) if messageType == nil { return nil, fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader) } switch messageType.String() { case eventstreamapi.EventMessageType: var v types.SelectObjectContentEventStream if err := awsRestxml_deserializeEventStreamSelectObjectContentEventStream(&v, msg); err != nil { return nil, err } return v, nil case eventstreamapi.ExceptionMessageType: return nil, awsRestxml_deserializeEventStreamExceptionSelectObjectContentEventStream(msg) case eventstreamapi.ErrorMessageType: errorCode := "UnknownError" errorMessage := errorCode if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil { errorCode = header.String() } if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil { errorMessage = header.String() } return nil, &smithy.GenericAPIError{ Code: errorCode, Message: errorMessage, } default: mc := msg.Clone() return nil, &UnknownEventMessageError{ Type: messageType.String(), Message: &mc, } } } func (r *selectObjectContentEventStream) ErrorSet() <-chan struct{} { return r.err.ErrorSet() } func (r *selectObjectContentEventStream) Close() error { r.closeOnce.Do(r.safeClose) return r.Err() } func (r *selectObjectContentEventStream) safeClose() { close(r.done) r.eventStream.Close() } func (r *selectObjectContentEventStream) Err() error { return r.err.Err() } func (r *selectObjectContentEventStream) Closed() <-chan struct{} { return r.done } type awsRestxml_deserializeOpEventStreamSelectObjectContent struct { LogEventStreamWrites bool LogEventStreamReads bool } func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) ID() string { return "OperationEventStreamDeserializer" } func (m *awsRestxml_deserializeOpEventStreamSelectObjectContent) HandleDeserialize(ctx context.Context, in middleware.DeserializeInput, next middleware.DeserializeHandler) ( out middleware.DeserializeOutput, metadata middleware.Metadata, err error, ) { defer func() { if err == nil { return } m.closeResponseBody(out) }() logger := middleware.GetLogger(ctx) request, ok := in.Request.(*smithyhttp.Request) if !ok { return out, metadata, fmt.Errorf("unknown transport type: %T", in.Request) } _ = request out, metadata, err = next.HandleDeserialize(ctx, in) if err != nil { return out, metadata, err } deserializeOutput, ok := out.RawResponse.(*smithyhttp.Response) if !ok { return out, metadata, fmt.Errorf("unknown transport type: %T", out.RawResponse) } _ = deserializeOutput output, ok := out.Result.(*SelectObjectContentOutput) if out.Result != nil && !ok { return out, metadata, fmt.Errorf("unexpected output result type: %T", out.Result) } else if out.Result == nil { output = &SelectObjectContentOutput{} out.Result = output } eventReader := newSelectObjectContentEventStream( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger options.LogMessages = m.LogEventStreamReads }), ) defer func() { if err == nil { return } _ = eventReader.Close() }() output.eventStream = NewSelectObjectContentEventStream(func(stream *SelectObjectContentEventStream) { stream.Reader = eventReader }) go output.eventStream.waitStreamClose() return out, metadata, nil } func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) closeResponseBody(out middleware.DeserializeOutput) { if resp, ok := out.RawResponse.(*smithyhttp.Response); ok && resp != nil && resp.Body != nil { _, _ = io.Copy(ioutil.Discard, resp.Body) _ = resp.Body.Close() } } func addEventStreamSelectObjectContentMiddleware(stack *middleware.Stack, options Options) error { return stack.Deserialize.Insert(&awsRestxml_deserializeOpEventStreamSelectObjectContent{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), }, "OperationDeserializer", middleware.Before) } // UnknownEventMessageError provides an error when a message is received from the stream, // but the reader is unable to determine what kind of message it is. type UnknownEventMessageError struct { Type string Message *eventstream.Message } // Error retruns the error message string. func (e *UnknownEventMessageError) Error() string { return "unknown event stream message type, " + e.Type } func setSafeEventStreamClientLogMode(o *Options, operation string) { switch operation { case "SelectObjectContent": toggleEventStreamClientLogMode(o, false, true) return default: return } } func toggleEventStreamClientLogMode(o *Options, request, response bool) { mode := o.ClientLogMode if request && mode.IsRequestWithBody() { mode.ClearRequestWithBody() mode |= aws.LogRequest } if response && mode.IsResponseWithBody() { mode.ClearResponseWithBody() mode |= aws.LogResponse } o.ClientLogMode = mode }