Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MemoryStore is a high-performance, thread-safe, in-memory key-value store implem
- 💪 High-performance using go-json
- 🔒 Clean shutdown mechanism
- 📝 Comprehensive documentation
- 📡 Pattern-based Publish/Subscribe system

## Installation

Expand Down Expand Up @@ -87,6 +88,49 @@ func main() {
}
```

### Working with PubSub

MemoryStore includes a powerful publish/subscribe system for real-time communication:

```go
func main() {
store := memorystore.NewMemoryStore()
defer store.Stop()

// Subscribe to user updates
userEvents, err := store.Subscribe("user:*")
if err != nil {
log.Fatal(err)
}

// Listen for messages in a goroutine
go func() {
for msg := range userEvents {
log.Printf("Received update: %s", string(msg))
}
}()

// Publish updates
err = store.Publish("user:123", []byte("status:active"))
if err != nil {
log.Fatal(err)
}
}
```

The PubSub system supports:
- Pattern-based subscriptions (`user:*`, `order:*:status`)
- Non-blocking message delivery
- Automatic cleanup of disconnected subscribers
- Thread-safe concurrent access
- Integration with existing store operations

Methods available:
- `Subscribe(pattern string) (<-chan []byte, error)`: Subscribe to a pattern
- `Publish(channel string, message []byte) error`: Publish a message
- `Unsubscribe(pattern string) error`: Unsubscribe from a pattern
- `SubscriberCount(pattern string) int`: Get number of subscribers

### Expiration and Cleanup

Keys automatically expire after their specified duration:
Expand Down Expand Up @@ -162,7 +206,6 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
- [ ] Add support for batch operations
- [ ] Implement data persistence
- [ ] Add metrics and monitoring
- [ ] Support for pattern-based key deletion
- [ ] Add compression options

## Support
Expand Down
154 changes: 154 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// main.go
// Package main provides a demonstration of the memorystore package functionality.
// It shows various use cases including storing/retrieving data, handling expiration,
// and proper error handling.
package main

import (
"encoding/json"
"log"
"strings"
"sync"
"time"

"github.com/BryceWayne/MemoryStore/memorystore"
Expand Down Expand Up @@ -119,6 +123,155 @@ func demonstrateStoreLifecycle() {
}
}

// demonstratePubSub shows the publish/subscribe functionality
// with pattern matching and multiple subscribers.
// demonstratePubSub shows the publish/subscribe functionality
// with complex pattern matching and JSON message support.
func demonstratePubSub(ms *memorystore.MemoryStore) {
log.Println("\n=== Demonstrating PubSub System ===")

var wg sync.WaitGroup

// 1. Complex Pattern Matching Examples
log.Println("Setting up pattern-based subscriptions...")
patterns := map[string]<-chan []byte{} // Store channels for cleanup

// Subscribe to various patterns
subscribePatterns := []string{
"users:*:status", // Match all user statuses
"users:admin:*", // Match all admin events
"orders:*.completed", // Match all completed orders
"notifications:*:high", // Match high-priority notifications
"system:*.error", // Match all system errors
}

for _, pattern := range subscribePatterns {
ch, err := ms.Subscribe(pattern)
if err != nil {
log.Printf("Failed to subscribe to %s: %v", pattern, err)
continue
}
patterns[pattern] = ch
log.Printf("Subscribed to pattern: %s", pattern)
}

// 2. JSON Message Integration
type UserStatus struct {
UserID string `json:"user_id"`
Status string `json:"status"`
LastSeen time.Time `json:"last_seen"`
Metadata map[string]string `json:"metadata"`
}

type OrderEvent struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
CompletedAt time.Time `json:"completed_at"`
Total float64 `json:"total"`
}

// Set up listeners for each pattern
for pattern, ch := range patterns {
wg.Add(1)
go func(pattern string, ch <-chan []byte) {
defer wg.Done()
log.Printf("Listening on pattern: %s", pattern)

select {
case msg := <-ch:
// Try to decode as UserStatus if it's a user event
if strings.HasPrefix(pattern, "users:") {
var status UserStatus
if err := json.Unmarshal(msg, &status); err == nil {
log.Printf("[%s] User Status Update: %+v", pattern, status)
} else {
log.Printf("[%s] Raw message: %s", pattern, string(msg))
}
} else if strings.HasPrefix(pattern, "orders:") {
// Try to decode as OrderEvent
var order OrderEvent
if err := json.Unmarshal(msg, &order); err == nil {
log.Printf("[%s] Order Event: %+v", pattern, order)
} else {
log.Printf("[%s] Raw message: %s", pattern, string(msg))
}
} else {
log.Printf("[%s] Message received: %s", pattern, string(msg))
}
case <-time.After(2 * time.Second):
log.Printf("[%s] No message received", pattern)
}
}(pattern, ch)
}

// Publish various types of messages
time.Sleep(100 * time.Millisecond) // Ensure subscribers are ready
log.Println("\nPublishing messages...")

// Publish JSON user status
adminStatus := UserStatus{
UserID: "admin123",
Status: "online",
LastSeen: time.Now(),
Metadata: map[string]string{"location": "NYC", "device": "desktop"},
}
statusJSON, _ := json.Marshal(adminStatus)
ms.Publish("users:admin:status", statusJSON)

// Publish JSON order completion
order := OrderEvent{
OrderID: "ORD-789",
Status: "completed",
CompletedAt: time.Now(),
Total: 299.99,
}
orderJSON, _ := json.Marshal(order)
ms.Publish("orders:ORD-789.completed", orderJSON)

// Publish system error
ms.Publish("system:database.error", []byte("Connection timeout"))

// Publish high-priority notification
ms.Publish("notifications:user123:high", []byte("Account security alert"))

// Wait for message processing
wg.Wait()

log.Println("\nDemonstrating pattern matching scenarios...")
// Show which patterns match different keys
testCases := []struct {
channel string
patterns []string
}{
{
channel: "users:admin:login",
patterns: []string{"users:admin:*"},
},
{
channel: "orders:xyz-789.completed",
patterns: []string{"orders:*.completed"},
},
{
channel: "notifications:admin:high",
patterns: []string{"notifications:*:high"},
},
}

for _, tc := range testCases {
log.Printf("Channel '%s' matches patterns: %v", tc.channel, tc.patterns)
}

// Cleanup
log.Println("\nCleaning up subscriptions...")
for pattern := range patterns {
if err := ms.Unsubscribe(pattern); err != nil {
log.Printf("Error unsubscribing from %s: %v", pattern, err)
} else {
log.Printf("Unsubscribed from %s", pattern)
}
}
}

func main() {
// Create a new MemoryStore instance
ms := memorystore.NewMemoryStore()
Expand All @@ -134,6 +287,7 @@ func main() {
demonstrateBasicOperations(ms)
demonstrateExpiration(ms)
demonstrateNonExistentKeys(ms)
demonstratePubSub(ms) // Add this line
demonstrateStoreLifecycle()

log.Println("\nAll demonstrations completed successfully")
Expand Down
5 changes: 5 additions & 0 deletions memorystore/memorystore.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// memorystore/memorystore.go
// Package memorystore provides a simple in-memory cache implementation with automatic cleanup
// of expired items. It supports both raw byte storage and JSON serialization/deserialization
// of structured data.
Expand All @@ -22,6 +23,7 @@ type item struct {
type MemoryStore struct {
mu sync.RWMutex // Protects access to the store map
store map[string]item // Internal storage for cache items
ps *pubSubManager // PubSub manager for cache events
ctx context.Context // Context for controlling the cleanup worker
cancelFunc context.CancelFunc // Function to stop the cleanup worker
wg sync.WaitGroup // WaitGroup for cleanup goroutine synchronization
Expand All @@ -37,6 +39,7 @@ func NewMemoryStore() *MemoryStore {
ctx: ctx,
cancelFunc: cancel,
}
ms.initPubSub()
ms.startCleanupWorker()
return ms
}
Expand All @@ -60,6 +63,8 @@ func (m *MemoryStore) Stop() error {
m.cancelFunc()
m.cancelFunc = nil

m.cleanupPubSub()

// Wait for cleanup goroutine to finish
m.wg.Wait()

Expand Down
Loading
Loading