diff --git a/README.md b/README.md index 18a167c..58c4336 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ MemoryStore is a high-performance, thread-safe, in-memory key-value store implem - 💪 High-performance using go-json - 🔒 Clean shutdown mechanism - 📝 Comprehensive documentation +- 📡 Pattern-based Publish/Subscribe system ## Installation @@ -87,6 +88,49 @@ func main() { } ``` +### Working with PubSub + +MemoryStore includes a powerful publish/subscribe system for real-time communication: + +```go +func main() { + store := memorystore.NewMemoryStore() + defer store.Stop() + + // Subscribe to user updates + userEvents, err := store.Subscribe("user:*") + if err != nil { + log.Fatal(err) + } + + // Listen for messages in a goroutine + go func() { + for msg := range userEvents { + log.Printf("Received update: %s", string(msg)) + } + }() + + // Publish updates + err = store.Publish("user:123", []byte("status:active")) + if err != nil { + log.Fatal(err) + } +} +``` + +The PubSub system supports: +- Pattern-based subscriptions (`user:*`, `order:*:status`) +- Non-blocking message delivery +- Automatic cleanup of disconnected subscribers +- Thread-safe concurrent access +- Integration with existing store operations + +Methods available: +- `Subscribe(pattern string) (<-chan []byte, error)`: Subscribe to a pattern +- `Publish(channel string, message []byte) error`: Publish a message +- `Unsubscribe(pattern string) error`: Unsubscribe from a pattern +- `SubscriberCount(pattern string) int`: Get number of subscribers + ### Expiration and Cleanup Keys automatically expire after their specified duration: @@ -162,7 +206,6 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file - [ ] Add support for batch operations - [ ] Implement data persistence - [ ] Add metrics and monitoring -- [ ] Support for pattern-based key deletion - [ ] Add compression options ## Support diff --git a/main.go b/main.go index cf61f22..381b14e 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,14 @@ +// main.go // Package main provides a demonstration of the memorystore package functionality. // It shows various use cases including storing/retrieving data, handling expiration, // and proper error handling. package main import ( + "encoding/json" "log" + "strings" + "sync" "time" "github.com/BryceWayne/MemoryStore/memorystore" @@ -119,6 +123,155 @@ func demonstrateStoreLifecycle() { } } +// demonstratePubSub shows the publish/subscribe functionality +// with pattern matching and multiple subscribers. +// demonstratePubSub shows the publish/subscribe functionality +// with complex pattern matching and JSON message support. +func demonstratePubSub(ms *memorystore.MemoryStore) { + log.Println("\n=== Demonstrating PubSub System ===") + + var wg sync.WaitGroup + + // 1. Complex Pattern Matching Examples + log.Println("Setting up pattern-based subscriptions...") + patterns := map[string]<-chan []byte{} // Store channels for cleanup + + // Subscribe to various patterns + subscribePatterns := []string{ + "users:*:status", // Match all user statuses + "users:admin:*", // Match all admin events + "orders:*.completed", // Match all completed orders + "notifications:*:high", // Match high-priority notifications + "system:*.error", // Match all system errors + } + + for _, pattern := range subscribePatterns { + ch, err := ms.Subscribe(pattern) + if err != nil { + log.Printf("Failed to subscribe to %s: %v", pattern, err) + continue + } + patterns[pattern] = ch + log.Printf("Subscribed to pattern: %s", pattern) + } + + // 2. JSON Message Integration + type UserStatus struct { + UserID string `json:"user_id"` + Status string `json:"status"` + LastSeen time.Time `json:"last_seen"` + Metadata map[string]string `json:"metadata"` + } + + type OrderEvent struct { + OrderID string `json:"order_id"` + Status string `json:"status"` + CompletedAt time.Time `json:"completed_at"` + Total float64 `json:"total"` + } + + // Set up listeners for each pattern + for pattern, ch := range patterns { + wg.Add(1) + go func(pattern string, ch <-chan []byte) { + defer wg.Done() + log.Printf("Listening on pattern: %s", pattern) + + select { + case msg := <-ch: + // Try to decode as UserStatus if it's a user event + if strings.HasPrefix(pattern, "users:") { + var status UserStatus + if err := json.Unmarshal(msg, &status); err == nil { + log.Printf("[%s] User Status Update: %+v", pattern, status) + } else { + log.Printf("[%s] Raw message: %s", pattern, string(msg)) + } + } else if strings.HasPrefix(pattern, "orders:") { + // Try to decode as OrderEvent + var order OrderEvent + if err := json.Unmarshal(msg, &order); err == nil { + log.Printf("[%s] Order Event: %+v", pattern, order) + } else { + log.Printf("[%s] Raw message: %s", pattern, string(msg)) + } + } else { + log.Printf("[%s] Message received: %s", pattern, string(msg)) + } + case <-time.After(2 * time.Second): + log.Printf("[%s] No message received", pattern) + } + }(pattern, ch) + } + + // Publish various types of messages + time.Sleep(100 * time.Millisecond) // Ensure subscribers are ready + log.Println("\nPublishing messages...") + + // Publish JSON user status + adminStatus := UserStatus{ + UserID: "admin123", + Status: "online", + LastSeen: time.Now(), + Metadata: map[string]string{"location": "NYC", "device": "desktop"}, + } + statusJSON, _ := json.Marshal(adminStatus) + ms.Publish("users:admin:status", statusJSON) + + // Publish JSON order completion + order := OrderEvent{ + OrderID: "ORD-789", + Status: "completed", + CompletedAt: time.Now(), + Total: 299.99, + } + orderJSON, _ := json.Marshal(order) + ms.Publish("orders:ORD-789.completed", orderJSON) + + // Publish system error + ms.Publish("system:database.error", []byte("Connection timeout")) + + // Publish high-priority notification + ms.Publish("notifications:user123:high", []byte("Account security alert")) + + // Wait for message processing + wg.Wait() + + log.Println("\nDemonstrating pattern matching scenarios...") + // Show which patterns match different keys + testCases := []struct { + channel string + patterns []string + }{ + { + channel: "users:admin:login", + patterns: []string{"users:admin:*"}, + }, + { + channel: "orders:xyz-789.completed", + patterns: []string{"orders:*.completed"}, + }, + { + channel: "notifications:admin:high", + patterns: []string{"notifications:*:high"}, + }, + } + + for _, tc := range testCases { + log.Printf("Channel '%s' matches patterns: %v", tc.channel, tc.patterns) + } + + // Cleanup + log.Println("\nCleaning up subscriptions...") + for pattern := range patterns { + if err := ms.Unsubscribe(pattern); err != nil { + log.Printf("Error unsubscribing from %s: %v", pattern, err) + } else { + log.Printf("Unsubscribed from %s", pattern) + } + } +} + func main() { // Create a new MemoryStore instance ms := memorystore.NewMemoryStore() @@ -134,6 +287,7 @@ func main() { demonstrateBasicOperations(ms) demonstrateExpiration(ms) demonstrateNonExistentKeys(ms) + demonstratePubSub(ms) // Add this line demonstrateStoreLifecycle() log.Println("\nAll demonstrations completed successfully") diff --git a/memorystore/memorystore.go b/memorystore/memorystore.go index c200842..1674985 100644 --- a/memorystore/memorystore.go +++ b/memorystore/memorystore.go @@ -1,3 +1,4 @@ +// memorystore/memorystore.go // Package memorystore provides a simple in-memory cache implementation with automatic cleanup // of expired items. It supports both raw byte storage and JSON serialization/deserialization // of structured data. @@ -22,6 +23,7 @@ type item struct { type MemoryStore struct { mu sync.RWMutex // Protects access to the store map store map[string]item // Internal storage for cache items + ps *pubSubManager // PubSub manager for cache events ctx context.Context // Context for controlling the cleanup worker cancelFunc context.CancelFunc // Function to stop the cleanup worker wg sync.WaitGroup // WaitGroup for cleanup goroutine synchronization @@ -37,6 +39,7 @@ func NewMemoryStore() *MemoryStore { ctx: ctx, cancelFunc: cancel, } + ms.initPubSub() ms.startCleanupWorker() return ms } @@ -60,6 +63,8 @@ func (m *MemoryStore) Stop() error { m.cancelFunc() m.cancelFunc = nil + m.cleanupPubSub() + // Wait for cleanup goroutine to finish m.wg.Wait() diff --git a/memorystore/pubsub.go b/memorystore/pubsub.go new file mode 100644 index 0000000..9c87633 --- /dev/null +++ b/memorystore/pubsub.go @@ -0,0 +1,212 @@ +// memorystore/pubsub.go +package memorystore + +import ( + "context" + "errors" + "strings" + "sync" +) + +// Constants for PubSub configuration +const ( + defaultChannelBuffer = 100 // Default buffer size for subscriber channels +) + +// Common errors for PubSub operations +var ( + ErrInvalidPattern = errors.New("invalid subscription pattern") + ErrStoreStopped = errors.New("store has been stopped") +) + +// subscription represents an individual subscriber +type subscription struct { + pattern string // The pattern this subscription matches + ch chan []byte // Channel for sending messages to the subscriber + ctx context.Context // Context for managing subscription lifetime + cancel func() // Function to cancel the subscription context +} + +// pubSubManager handles all publish/subscribe operations +type pubSubManager struct { + mu sync.RWMutex + subscriptions map[string][]*subscription // Pattern -> subscriptions mapping + wg sync.WaitGroup // For graceful shutdown +} + +// newPubSubManager creates and initializes a new pubSubManager +func newPubSubManager() *pubSubManager { + return &pubSubManager{ + subscriptions: make(map[string][]*subscription), + } +} + +// Subscribe creates a new subscription for the given pattern +func (m *MemoryStore) Subscribe(pattern string) (<-chan []byte, error) { + if m.IsStopped() { + return nil, ErrStoreStopped + } + + if pattern == "" { + return nil, ErrInvalidPattern + } + + // Create subscription context + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []byte, defaultChannelBuffer) + + sub := &subscription{ + pattern: pattern, + ch: ch, + ctx: ctx, + cancel: cancel, + } + + // Add subscription to manager + m.ps.mu.Lock() + m.ps.subscriptions[pattern] = append(m.ps.subscriptions[pattern], sub) + m.ps.mu.Unlock() + + // Add to wait group for graceful shutdown + m.ps.wg.Add(1) + + // Cleanup goroutine + go func() { + defer m.ps.wg.Done() + <-ctx.Done() + m.removeSubscription(pattern, sub) + close(ch) + }() + + return ch, nil +} + +// Publish sends a message to all subscribers matching the given channel +func (m *MemoryStore) Publish(channel string, message []byte) error { + if m.IsStopped() { + return ErrStoreStopped + } + + m.ps.mu.RLock() + defer m.ps.mu.RUnlock() + + // Find all matching subscriptions and publish to them + for pattern, subs := range m.ps.subscriptions { + if matchesPattern(channel, pattern) { + for _, sub := range subs { + select { + case <-sub.ctx.Done(): + continue // Skip closed subscriptions + default: + select { + case sub.ch <- message: + default: + // Channel is full, skip this subscriber + // Could add logging or metrics here + } + } + } + } + } + + return nil +} + +// Unsubscribe cancels all subscriptions for the given pattern +func (m *MemoryStore) Unsubscribe(pattern string) error { + if m.IsStopped() { + return ErrStoreStopped + } + + m.ps.mu.Lock() + defer m.ps.mu.Unlock() + + subs, exists := m.ps.subscriptions[pattern] + if !exists { + return nil + } + + // Cancel all subscriptions for this pattern + for _, sub := range subs { + sub.cancel() + } + + delete(m.ps.subscriptions, pattern) + return nil +} + +// SubscriberCount returns the number of subscribers for a given pattern +func (m *MemoryStore) SubscriberCount(pattern string) int { + m.ps.mu.RLock() + defer m.ps.mu.RUnlock() + + count := 0 + for p, subs := range m.ps.subscriptions { + if p == pattern { + count += len(subs) + } + } + return count +} + +// removeSubscription removes a specific subscription +func (m *MemoryStore) removeSubscription(pattern string, sub *subscription) { + m.ps.mu.Lock() + defer m.ps.mu.Unlock() + + subs := m.ps.subscriptions[pattern] + for i, s := range subs { + if s == sub { + // Remove subscription from slice + subs = append(subs[:i], subs[i+1:]...) + break + } + } + + if len(subs) == 0 { + delete(m.ps.subscriptions, pattern) + } else { + m.ps.subscriptions[pattern] = subs + } +} + +// matchesPattern checks if a channel matches a subscription pattern +func matchesPattern(channel, pattern string) bool { + // Split pattern and channel into segments + patternParts := strings.Split(pattern, ":") + channelParts := strings.Split(channel, ":") + + if len(patternParts) != len(channelParts) { + return false + } + + // Check each segment + for i, patternPart := range patternParts { + if patternPart != "*" && patternPart != channelParts[i] { + return false + } + } + + return true +} + +// initPubSub initializes the PubSub system for the MemoryStore +func (m *MemoryStore) initPubSub() { + m.ps = newPubSubManager() +} + +// cleanupPubSub performs cleanup of the PubSub system during store shutdown +func (m *MemoryStore) cleanupPubSub() { + m.ps.mu.Lock() + // Cancel all subscriptions + for pattern, subs := range m.ps.subscriptions { + for _, sub := range subs { + sub.cancel() + } + delete(m.ps.subscriptions, pattern) + } + m.ps.mu.Unlock() + + // Wait for all subscription goroutines to finish + m.ps.wg.Wait() +} diff --git a/memorystore/pubsub_test.go b/memorystore/pubsub_test.go new file mode 100644 index 0000000..1c9e0dd --- /dev/null +++ b/memorystore/pubsub_test.go @@ -0,0 +1,344 @@ +// memorystore/pubsub_test.go +package memorystore + +import ( + "fmt" + "sync" + "testing" + "time" +) + +// TestMemoryStore_Subscribe tests basic subscription functionality +func TestMemoryStore_Subscribe(t *testing.T) { + tests := []struct { + name string + pattern string + wantErr bool + publishKey string + publishMsg []byte + shouldMatch bool + }{ + { + name: "exact match subscription", + pattern: "user:123", + wantErr: false, + publishKey: "user:123", + publishMsg: []byte("test message"), + shouldMatch: true, + }, + { + name: "wildcard subscription", + pattern: "user:*", + wantErr: false, + publishKey: "user:123", + publishMsg: []byte("test message"), + shouldMatch: true, + }, + { + name: "non-matching subscription", + pattern: "user:123", + wantErr: false, + publishKey: "user:456", + publishMsg: []byte("test message"), + shouldMatch: false, + }, + { + name: "empty pattern", + pattern: "", + wantErr: true, + publishKey: "", + publishMsg: nil, + shouldMatch: false, + }, + { + name: "multiple wildcards", + pattern: "user:*:status", + wantErr: false, + publishKey: "user:123:status", + publishMsg: []byte("active"), + shouldMatch: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := NewMemoryStore() + defer ms.Stop() + + // Create subscription + ch, err := ms.Subscribe(tt.pattern) + if (err != nil) != tt.wantErr { + t.Errorf("Subscribe() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if err != nil { + return + } + + // Test publishing + var receivedMsg []byte + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + select { + case msg := <-ch: + receivedMsg = msg + case <-time.After(100 * time.Millisecond): + // Timeout if no message received + } + }() + + err = ms.Publish(tt.publishKey, tt.publishMsg) + if err != nil { + t.Errorf("Publish() error = %v", err) + } + + wg.Wait() + + if tt.shouldMatch { + if receivedMsg == nil { + t.Error("Expected to receive message but got none") + } else if string(receivedMsg) != string(tt.publishMsg) { + t.Errorf("Got message %s, want %s", string(receivedMsg), string(tt.publishMsg)) + } + } else { + if receivedMsg != nil { + t.Errorf("Got unexpected message %s", string(receivedMsg)) + } + } + }) + } +} + +// TestMemoryStore_Unsubscribe tests unsubscription functionality +func TestMemoryStore_Unsubscribe(t *testing.T) { + ms := NewMemoryStore() + defer ms.Stop() + + pattern := "test:*" + ch, err := ms.Subscribe(pattern) + if err != nil { + t.Fatalf("Subscribe() error = %v", err) + } + + // Unsubscribe + err = ms.Unsubscribe(pattern) + if err != nil { + t.Errorf("Unsubscribe() error = %v", err) + } + + // Verify channel is closed + select { + case _, ok := <-ch: + if ok { + t.Error("Channel should be closed after unsubscribe") + } + case <-time.After(100 * time.Millisecond): + t.Error("Channel should be closed immediately") + } + + // Verify subscriber count is 0 + if count := ms.SubscriberCount(pattern); count != 0 { + t.Errorf("SubscriberCount() = %v, want 0", count) + } +} + +// TestMemoryStore_MultipleSubscribers tests multiple subscribers to the same pattern +func TestMemoryStore_MultipleSubscribers(t *testing.T) { + ms := NewMemoryStore() + defer ms.Stop() + + pattern := "test:*" + subscribers := 5 + message := []byte("test message") + timeout := time.After(2 * time.Second) // Add timeout + + var channels []<-chan []byte + var wg sync.WaitGroup + + // Create subscribers + for i := 0; i < subscribers; i++ { + ch, err := ms.Subscribe(pattern) + if err != nil { + t.Fatalf("Subscribe() error = %v", err) + } + channels = append(channels, ch) + } + + // Verify subscriber count + if count := ms.SubscriberCount(pattern); count != subscribers { + t.Errorf("SubscriberCount() = %v, want %v", count, subscribers) + } + + // Test message delivery to all subscribers + wg.Add(subscribers) + receivedCount := 0 + var mu sync.Mutex + + for i := 0; i < subscribers; i++ { + go func(ch <-chan []byte) { + defer wg.Done() + select { + case msg := <-ch: + if string(msg) == string(message) { + mu.Lock() + receivedCount++ + mu.Unlock() + } + case <-timeout: + // Timeout - don't block forever + t.Error("Timeout waiting for message") + } + }(channels[i]) + } + + // Publish message + err := ms.Publish("test:123", message) + if err != nil { + t.Fatalf("Publish() error = %v", err) + } + + // Wait with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(3 * time.Second): + t.Fatal("Test timed out") + } + + if receivedCount != subscribers { + t.Errorf("Message received by %v subscribers, want %v", receivedCount, subscribers) + } +} + +func TestMemoryStore_PubSub_Concurrent(t *testing.T) { + ms := NewMemoryStore() + defer ms.Stop() + + const publishers = 5 + const subscribers = 5 + const messagesPerPublisher = 20 + + var wg sync.WaitGroup + received := make(map[string]int) + var mu sync.Mutex + + // Channel to signal when all messages have been published + allPublished := make(chan struct{}) + + // Create subscribers + var subWg sync.WaitGroup + for i := 0; i < subscribers; i++ { + ch, err := ms.Subscribe("test:*") + if err != nil { + t.Fatalf("Subscribe() error = %v", err) + } + + subWg.Add(1) + go func() { + defer subWg.Done() + for { + select { + case msg, ok := <-ch: + if !ok { + return + } + mu.Lock() + received[string(msg)]++ + mu.Unlock() + case <-allPublished: + // Give a short time to process any remaining messages + time.Sleep(100 * time.Millisecond) + return + } + } + }() + } + + // Create publishers + wg.Add(publishers) + for i := 0; i < publishers; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < messagesPerPublisher; j++ { + msg := []byte(fmt.Sprintf("msg-%d-%d", id, j)) + if err := ms.Publish("test:123", msg); err != nil { + t.Errorf("Publish() error = %v", err) + } + time.Sleep(time.Millisecond) // Small delay to prevent message flood + } + }(i) + } + + // Wait for all publishers to finish + wg.Wait() + close(allPublished) + + // Wait for subscribers to finish processing + done := make(chan struct{}) + go func() { + subWg.Wait() + close(done) + }() + + // Wait with timeout + select { + case <-done: + // Success + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for subscribers to finish") + } + + // Verify results + mu.Lock() + totalMessages := len(received) + messageCount := 0 + for _, count := range received { + messageCount += count + } + mu.Unlock() + + expectedTotal := publishers * messagesPerPublisher * subscribers + if messageCount != expectedTotal { + t.Errorf("Expected %d total message receipts, got %d", expectedTotal, messageCount) + } + if totalMessages == 0 { + t.Error("No messages were received") + } +} + +// BenchmarkMemoryStore_PubSub benchmarks publish/subscribe operations +func BenchmarkMemoryStore_PubSub(b *testing.B) { + ms := NewMemoryStore() + defer ms.Stop() + + ch, err := ms.Subscribe("bench:*") + if err != nil { + b.Fatalf("Subscribe() error = %v", err) + } + + // Start consumer + go func() { + for range ch { + // Consume messages + } + }() + + message := []byte("benchmark message") + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := ms.Publish("bench:test", message); err != nil { + b.Fatalf("Publish() error = %v", err) + } + } +}