pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/bxcodec/goqueue/commit/90f5e491bf267f47e9a88647167f2dd5b935622c

e157469407.css" /> refactor: hide unnecessary fns (#13) · bxcodec/goqueue@90f5e49 · GitHub
Skip to content

Commit 90f5e49

Browse files
authored
refactor: hide unnecessary fns (#13)
1 parent dfdf7c0 commit 90f5e49

30 files changed

+513
-604
lines changed

consumer/option.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

consumer/service.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package consumer
2+
3+
import (
4+
"github.com/bxcodec/goqueue/internal/consumer"
5+
"github.com/bxcodec/goqueue/internal/consumer/rabbitmq"
6+
"github.com/bxcodec/goqueue/options"
7+
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
8+
)
9+
10+
func NewConsumer(platform options.Platform, opts ...consumerOpts.ConsumerOptionFunc) consumer.Consumer {
11+
switch platform {
12+
case consumerOpts.ConsumerPlatformRabbitMQ:
13+
return rabbitmq.NewConsumer(opts...)
14+
case consumerOpts.ConsumerPlatformGooglePubSub:
15+
// TODO (bxcodec): implement google pubsub publisher
16+
case consumerOpts.ConsumerPlatformSQS:
17+
// TODO (bxcodec): implement sns publisher
18+
default:
19+
panic("unknown publisher platform")
20+
}
21+
return nil
22+
}

encoding.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,25 @@ import (
66
"sync"
77

88
headerVal "github.com/bxcodec/goqueue/headers/value"
9+
"github.com/bxcodec/goqueue/interfaces"
910
)
1011

1112
// EncoderFn is a function type that encodes a message into a byte slice.
1213
// It takes a context and a message as input and returns the encoded data and an error (if any).
13-
type EncoderFn func(ctx context.Context, m Message) (data []byte, err error)
14+
type EncoderFn func(ctx context.Context, m interfaces.Message) (data []byte, err error)
1415

1516
// DecoderFn is a function type that decodes a byte slice into a Message.
1617
// It takes a context and a byte slice as input and returns a Message and an error.
17-
type DecoderFn func(ctx context.Context, data []byte) (m Message, err error)
18+
type DecoderFn func(ctx context.Context, data []byte) (m interfaces.Message, err error)
1819

1920
var (
2021
// JSONEncoder is an implementation of the EncoderFn interface
2122
// that encodes a Message into JSON format.
22-
JSONEncoder EncoderFn = func(ctx context.Context, m Message) (data []byte, err error) {
23+
JSONEncoder EncoderFn = func(ctx context.Context, m interfaces.Message) (data []byte, err error) {
2324
return json.Marshal(m)
2425
}
2526
// JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message.
26-
JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m Message, err error) {
27+
JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m interfaces.Message, err error) {
2728
err = json.Unmarshal(data, &m)
2829
return
2930
}

examples/rabbitmq/main.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010

1111
"github.com/bxcodec/goqueue"
1212
"github.com/bxcodec/goqueue/consumer"
13-
rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
13+
"github.com/bxcodec/goqueue/interfaces"
1414
"github.com/bxcodec/goqueue/middleware"
15+
"github.com/bxcodec/goqueue/options"
16+
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
17+
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
1518
"github.com/bxcodec/goqueue/publisher"
16-
rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
1719
)
1820

1921
func initExchange(ch *amqp.Channel, exchangeName string) error {
@@ -35,9 +37,14 @@ func main() {
3537
panic(err)
3638
}
3739

38-
rmqPub := rmqPublisher.NewPublisher(rmqConn,
39-
publisher.WithPublisherID("publisher_id"),
40-
publisher.WithMiddlewares(
40+
rmqPub := publisher.NewPublisher(
41+
publisherOpts.PublisherPlatformRabbitMQ,
42+
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
43+
Conn: rmqConn,
44+
PublisherChannelPoolSize: 5,
45+
}),
46+
publisherOpts.WithPublisherID("publisher_id"),
47+
publisherOpts.WithMiddlewares(
4148
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
4249
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
4350
),
@@ -56,35 +63,36 @@ func main() {
5663
panic(err)
5764
}
5865
defer consumerChannel.Close()
59-
60-
rmqConsumer := rmqConsumer.NewConsumer(
61-
publisherChannel,
62-
consumerChannel,
63-
consumer.WithMiddlewares(
66+
rmqConsumer := consumer.NewConsumer(
67+
consumerOpts.ConsumerPlatformRabbitMQ,
68+
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
69+
ConsumerChannel: consumerChannel,
70+
ReQueueChannel: publisherChannel,
71+
}),
72+
consumerOpts.WithConsumerID("consumer_id"),
73+
consumerOpts.WithMiddlewares(
6474
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
6575
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
6676
),
67-
consumer.WithQueueName("consumer_queue"),
68-
consumer.WithConsumerID("consumer_id"),
69-
consumer.WithBatchMessageSize(1),
70-
consumer.WithMaxRetryFailedMessage(3),
71-
consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
72-
consumer.WithTopicName("goqueue"),
77+
consumerOpts.WithMaxRetryFailedMessage(3),
78+
consumerOpts.WithBatchMessageSize(1),
79+
consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
80+
consumerOpts.WithTopicName("goqueue"),
81+
consumerOpts.WithQueueName("consumer_queue"),
7382
)
7483

7584
queueSvc := goqueue.NewQueueService(
76-
goqueue.WithPublisher(rmqPub),
77-
goqueue.WithConsumer(rmqConsumer),
78-
goqueue.WithMessageHandler(handler()),
85+
options.WithConsumer(rmqConsumer),
86+
options.WithPublisher(rmqPub),
87+
options.WithMessageHandler(handler()),
7988
)
80-
8189
go func() {
8290
for i := 0; i < 10; i++ {
8391
data := map[string]interface{}{
8492
"message": fmt.Sprintf("Hello World %d", i),
8593
}
8694
jbyt, _ := json.Marshal(data)
87-
err := queueSvc.Publish(context.Background(), goqueue.Message{
95+
err := queueSvc.Publish(context.Background(), interfaces.Message{
8896
Data: data,
8997
Action: "goqueue.payments.create",
9098
Topic: "goqueue",
@@ -105,8 +113,8 @@ func main() {
105113
}
106114
}
107115

108-
func handler() goqueue.InboundMessageHandlerFunc {
109-
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
116+
func handler() interfaces.InboundMessageHandlerFunc {
117+
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
110118
data := m.Data
111119
jbyt, _ := json.Marshal(data)
112120
fmt.Println("Message Received: ", string(jbyt))

delayfn.go renamed to interfaces/delayfn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package goqueue
1+
package interfaces
22

33
type DelayFn func(retries int64) (delay int64)
44

consumer.go renamed to interfaces/inboundmessagehandler.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,7 @@
1-
package goqueue
1+
package interfaces
22

33
import "context"
44

5-
// Consumer represents an entity that consumes messages from a queue.
6-
//
7-
//go:generate mockery --name Consumer
8-
type Consumer interface {
9-
// Consume consumes messages from the queue and passes them to the provided handler.
10-
// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
11-
// It returns an error if there was a problem consuming the messages.
12-
Consume(ctx context.Context, handler InboundMessageHandler, meta map[string]interface{}) (err error)
13-
14-
// Stop stops the consumer from consuming messages.
15-
// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
16-
Stop(ctx context.Context) (err error)
17-
}
18-
195
//go:generate mockery --name InboundMessageHandler
206
type InboundMessageHandler interface {
217
HandleMessage(ctx context.Context, m InboundMessage) (err error)

message.go renamed to interfaces/message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package goqueue
1+
package interfaces
22

33
import (
44
"time"

publisher.go renamed to interfaces/publisher.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
1-
package goqueue
1+
package interfaces
22

33
import "context"
44

5-
// Publisher represents an interface for publishing messages.
6-
//
7-
//go:generate mockery --name Publisher
8-
type Publisher interface {
9-
PublisherHandler
10-
Close(ctx context.Context) (err error)
11-
}
12-
135
// PublisherHandler is an interface that defines the behavior of a message publisher.
146
//
157
//go:generate mockery --name PublisherHandler

internal/consumer/consumer.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package consumer
2+
3+
import (
4+
"context"
5+
6+
"github.com/bxcodec/goqueue/interfaces"
7+
)
8+
9+
// Consumer represents an entity that consumes messages from a queue.
10+
//
11+
//go:generate mockery --name Consumer
12+
type Consumer interface {
13+
// Consume consumes messages from the queue and passes them to the provided handler.
14+
// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
15+
// It returns an error if there was a problem consuming the messages.
16+
Consume(ctx context.Context, handler interfaces.InboundMessageHandler, meta map[string]interface{}) (err error)
17+
18+
// Stop stops the consumer from consuming messages.
19+
// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
20+
Stop(ctx context.Context) (err error)
21+
}

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy