Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/reatom/reatom/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The withConnectHook extension executes a callback when the atom gains its first subscriber (connects) and optionally runs cleanup when the last subscriber disconnects. This is perfect for managing resources, subscriptions, or side effects that should only exist while the atom is actively being used.

Type Signature

function withConnectHook<Target extends AtomLike>(
  cb: (target: Target) => MaybeUnsubscribe
): Ext<Target>

function withDisconnectHook<Target extends AtomLike>(
  cb: (target: Target) => void
): Ext<Target>

type MaybeUnsubscribe = void | (() => void) | Promise<void | (() => void)>

Parameters

cb
function
required
Callback executed when the atom connects (gains first subscriber).Parameters:
  • target: The atom being connected
Returns:
  • void: No cleanup needed
  • () => void: Cleanup function to run on disconnect
  • Promise<void | (() => void)>: Async initialization with optional cleanup

Examples

Basic Connection Tracking

import { atom } from '@reatom/core'
import { withConnectHook, withDisconnectHook } from '@reatom/core/extensions'

const data = atom(0, 'data').extend(
  withConnectHook(() => {
    console.log('Connected')
  }),
  withDisconnectHook(() => {
    console.log('Disconnected')
  })
)

data() // No logs (just reading, not subscribing)

const unsubscribe = data.subscribe()
// Logs: "Connected"

unsubscribe()
// Logs: "Disconnected"

Cleanup on Disconnect

const data = atom(0, 'data').extend(
  withConnectHook(() => {
    const intervalId = setInterval(() => {
      console.log('Polling...')
    }, 1000)
    
    // Return cleanup function
    return () => clearInterval(intervalId)
  })
)

const unsub = data.subscribe()
// Starts polling

unsub()
// Stops polling (cleanup called)

WebSocket Connection

const liveData = atom<Message[]>([], 'liveData').extend(
  withConnectHook(() => {
    const ws = new WebSocket('wss://api.example.com')
    
    ws.onmessage = (event) => {
      const message = JSON.parse(event.data)
      liveData.set((messages) => [...messages, message])
    }
    
    // Cleanup: close WebSocket on disconnect
    return () => ws.close()
  })
)

// WebSocket opens when first subscriber connects
const unsub = liveData.subscribe()

// WebSocket closes when last subscriber disconnects
unsub()

Async Initialization

const remoteData = atom<Data | null>(null, 'remoteData').extend(
  withConnectHook(async () => {
    // Fetch initial data
    const response = await fetch('/api/data')
    const data = await response.json()
    remoteData.set(data)
    
    // Start polling
    const intervalId = setInterval(async () => {
      const response = await fetch('/api/data')
      const data = await response.json()
      remoteData.set(data)
    }, 5000)
    
    // Return cleanup
    return () => clearInterval(intervalId)
  })
)

Multiple Subscribers

const connect = vi.fn()
const disconnect = vi.fn()

const data = atom(0, 'data').extend(
  withConnectHook(() => connect()),
  withDisconnectHook(() => disconnect())
)

const unsub1 = data.subscribe()
// connect() called once

const unsub2 = data.subscribe()
// connect() NOT called again (already connected)

unsub1()
// disconnect() NOT called (still has unsub2)

unsub2()
// disconnect() called (last subscriber removed)

Computed Dependency Chain

const a = atom(0, 'a').extend(
  withConnectHook(() => console.log('a connected')),
  withDisconnectHook(() => console.log('a disconnected'))
)

const b = computed(() => a(), 'b').extend(
  withConnectHook(() => console.log('b connected')),
  withDisconnectHook(() => console.log('b disconnected'))
)

const unsubB = b.subscribe()
// Logs: "b connected", "a connected"

unsubB()
// Logs: "b disconnected", "a disconnected"

Abort on Disconnect

import { abortVar } from '@reatom/core/methods'

const polling = atom(0, 'polling').extend(
  withConnectHook(() => {
    const poll = async () => {
      while (true) {
        await wrap(sleep(1000))
        // Fetch data...
      }
    }
    
    poll().catch((error) => {
      if (!isAbort(error)) throw error
    })
    
    // Abort polling on disconnect
    abortVar.subscribe(() => {
      console.log('Polling aborted')
    })
  })
)

const unsub = polling.subscribe()
// Starts polling

unsub()
// Aborts polling

Resource Management

const videoPlayer = atom(
  { playing: false, currentTime: 0 },
  'videoPlayer'
).extend(
  withConnectHook(() => {
    const video = document.createElement('video')
    video.src = '/video.mp4'
    
    // Setup event listeners
    const onTimeUpdate = () => {
      videoPlayer.set((state) => ({
        ...state,
        currentTime: video.currentTime,
      }))
    }
    
    video.addEventListener('timeupdate', onTimeUpdate)
    
    // Cleanup
    return () => {
      video.removeEventListener('timeupdate', onTimeUpdate)
      video.pause()
      video.src = ''
    }
  })
)

Use Cases

Server-Sent Events (SSE)

const notifications = atom<Notification[]>([], 'notifications').extend(
  withConnectHook(() => {
    const eventSource = new EventSource('/api/notifications')
    
    eventSource.onmessage = (event) => {
      const notification = JSON.parse(event.data)
      notifications.set((list) => [...list, notification])
    }
    
    return () => eventSource.close()
  })
)

Database Subscription

const todos = atom<Todo[]>([], 'todos').extend(
  withConnectHook(async () => {
    // Initial load
    const initial = await db.todos.getAll()
    todos.set(initial)
    
    // Subscribe to changes
    const unsubscribe = db.todos.subscribe((changes) => {
      todos.set(changes)
    })
    
    return unsubscribe
  })
)

Analytics Tracking

const pageView = atom(
  { path: '/', timestamp: Date.now() },
  'pageView'
).extend(
  withConnectHook(() => {
    analytics.track('page_view_subscribed', {
      path: pageView().path,
    })
    
    return () => {
      analytics.track('page_view_unsubscribed')
    }
  })
)

Geolocation Tracking

const location = atom<GeolocationPosition | null>(null, 'location').extend(
  withConnectHook(() => {
    const watchId = navigator.geolocation.watchPosition(
      (position) => location.set(position),
      (error) => console.error('Geolocation error:', error)
    )
    
    return () => {
      navigator.geolocation.clearWatch(watchId)
    }
  })
)

Event Bus Subscription

const messages = atom<Message[]>([], 'messages').extend(
  withConnectHook(() => {
    const handleMessage = (msg: Message) => {
      messages.set((list) => [...list, msg])
    }
    
    eventBus.on('message', handleMessage)
    
    return () => {
      eventBus.off('message', handleMessage)
    }
  })
)

Reactive Resize Observer

const elementSize = atom(
  { width: 0, height: 0 },
  'elementSize'
).extend(
  withConnectHook(() => {
    const element = document.getElementById('target')
    if (!element) return
    
    const observer = new ResizeObserver((entries) => {
      const { width, height } = entries[0].contentRect
      elementSize.set({ width, height })
    })
    
    observer.observe(element)
    
    return () => observer.disconnect()
  })
)

Pattern: Circular Subscriptions

const externalResource = atom(0, 'externalResource').extend(
  withConnectHook(() => {
    effect(() => {
      resource.data() // Subscribe to resource data
    })
  })
)

const resource = computed(async () => {
  await wrap(sleep())
  return 'done'
}, 'resource').extend(withAsyncData({ initState: 'init' }))

resource.data.extend(
  withConnectHook(() => externalResource.subscribe())
)

const track = subscribe(resource.data)
await wrap(resource())
// Handles circular dependency correctly