Skip to content

Stream API Code Examples

This guide provides practical code examples for working with the DBConvert Streams Stream API in various programming languages. For the complete API reference, see the Stream API Reference.

Prerequisites

Before using these examples, ensure you have:

  • A valid DBConvert Streams API key
  • The API server running (default: http://127.0.0.1:8020/api/v1)
  • At least one configured source and target connection (see Connection API Reference)
  • Required language-specific dependencies:
    • Python: requests library (pip install requests)
    • Node.js: axios library (npm install axios)
    • PowerShell: Version 5.1 or later

Important

Before using any connection or stream endpoints, you must first call the /user/configs endpoint to load existing configurations:

bash
# Using curl
curl -X GET "http://127.0.0.1:8020/api/v1/user/configs" \
  -H "X-API-Key: your_api_key_here"

This endpoint loads all user configurations including existing connections and stream configurations. Make sure to call this endpoint:

  • When starting your application
  • Before listing or managing connections
  • Before creating or managing streams

Understanding ID Lifecycle

When working with streams, you'll deal with different types of IDs that represent separate entities:

  1. Connection IDs - Used to identify database connections:

    conn_2sZyDMeIbdaXnN9OL7vYzzAiZU5  # Source connection
    conn_2sZyBc83hypbiTMrNTotKPyjwFC  # Target connection
  2. Stream Configuration IDs - Templates for creating streams:

    config_2sZyDMeIbdaXnN9OL7vYzzAiZU5  # Reusable stream configuration
  3. Stream IDs - Independent stream instances:

    stream_2sZyDMeIbdaXnN9OL7vYzzAiZU5  # A running stream instance
    stream_3tZyDMeIbdaXnN9OL7vYzzBiZU6  # Another stream from the same config

TIP

Stream configurations are templates that can be used to create multiple independent streams. Each stream gets its own unique stream_ ID, separate from the configuration that created it.

Language Examples

Choose your preferred language:

  • curl - For quick command-line testing
  • Python - Using the requests library
  • Node.js - Using axios with async/await
  • PowerShell - Using PowerShell classes

Each example demonstrates a complete workflow:

  1. Creating source and target connections
  2. Testing the connections
  3. Creating a stream configuration
  4. Starting multiple streams from the configuration
  5. Managing running streams independently

Complete Workflow Example

Here's a complete workflow showing how to:

  1. Create source and target connections (getting conn_ IDs)
  2. Test the connections
  3. Create a stream configuration (getting a config_ ID)
  4. Create multiple streams from the configuration (getting stream_ IDs)
  5. Manage the running streams independently

Using curl

Complete command-line examples for testing and managing streams:

bash
# Set your API key and base URL
export API_KEY="your_api_key_here"
export API_URL="http://127.0.0.1:8020/api/v1"

# Load user configurations (REQUIRED)
curl -X GET "$API_URL/user/configs" \
  -H "X-API-Key: $API_KEY"

# 1. Create source MySQL connection
source_conn=$(curl -X POST "$API_URL/connections" \
  -H "X-API-Key: $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-source",
    "type": "mysql",
    "host": "localhost",
    "port": 3306,
    "username": "root",
    "password": "your_password",
    "database": "source_db"
  }')

# Extract source connection ID
source_id=$(echo $source_conn | jq -r '.id')
echo "Source connection ID: $source_id"  # conn_...

# 2. Create target PostgreSQL connection
target_conn=$(curl -X POST "$API_URL/connections" \
  -H "X-API-Key: $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-target",
    "type": "postgresql",
    "host": "localhost",
    "port": 5432,
    "username": "postgres",
    "password": "your_password",
    "database": "target_db"
  }')

# Extract target connection ID
target_id=$(echo $target_conn | jq -r '.id')
echo "Target connection ID: $target_id"  # conn_...

# 3. Test both connections
curl -X POST "$API_URL/connections/$source_id/ping" \
  -H "X-API-Key: $API_KEY"

curl -X POST "$API_URL/connections/$target_id/ping" \
  -H "X-API-Key: $API_KEY"

# 4. Create a stream configuration (template)
stream_config=$(curl -X POST "$API_URL/stream-configs" \
  -H "X-API-Key: $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql_to_postgresql",
    "source": "'"$source_id"'",
    "target": "'"$target_id"'",
    "mode": "cdc",
    "dataBundleSize": 40,
    "reportingIntervals": {
      "source": 3,
      "target": 3
    },
    "tables": [
      {
        "name": "users",
        "operations": ["insert", "update", "delete"]
      }
    ],
    "createStructure": true
  }')

# Extract configuration ID
config_id=$(echo $stream_config | jq -r '.id')
echo "Stream configuration ID: $config_id"  # config_...

# 5. Create multiple streams from the same configuration
# Start first stream
stream1_start=$(curl -X POST "$API_URL/streams/$config_id/start" \
  -H "X-API-Key: $API_KEY")

# Extract first stream ID
stream1_id=$(echo $stream1_start | jq -r '.id')
echo "First stream ID: $stream1_id"  # stream_...

# Start second stream from the same configuration
stream2_start=$(curl -X POST "$API_URL/streams/$config_id/start" \
  -H "X-API-Key: $API_KEY")

# Extract second stream ID
stream2_id=$(echo $stream2_start | jq -r '.id')
echo "Second stream ID: $stream2_id"  # stream_...

# 6. Manage streams independently
# Get statistics for first stream
curl -X GET "$API_URL/streams/$stream1_id/stats" \
  -H "X-API-Key: $API_KEY"

# Pause first stream
curl -X POST "$API_URL/streams/$stream1_id/pause" \
  -H "X-API-Key: $API_KEY"

# Get statistics for second stream
curl -X GET "$API_URL/streams/$stream2_id/stats" \
  -H "X-API-Key: $API_KEY"

# Stop second stream
curl -X POST "$API_URL/streams/$stream2_id/stop" \
  -H "X-API-Key: $API_KEY"

# The configuration remains available for creating more streams
echo "Configuration $config_id can be used to create more streams"

Using Python

Here's a complete Python example that handles both connections and streams:

python
import requests
from typing import Optional, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ConnectionConfig:
    name: str
    type: str
    host: str
    port: int
    username: str
    password: str
    database: str
    schema: Optional[str] = None

@dataclass
class StreamConfig:
    name: str
    source: str  # Connection ID
    target: str  # Connection ID
    mode: str
    tables: List[Dict[str, Any]]
    data_bundle_size: int = 40
    reporting_intervals: Dict[str, int] = None
    create_structure: bool = True

class DBConvertAPI:
    """Combined client for DBConvert Streams API."""
    
    def __init__(self, api_key: str, base_url: str = "http://127.0.0.1:8020/api/v1"):
        self.base_url = base_url
        self.headers = {
            "X-API-Key": api_key,
            "Content-Type": "application/json"
        }

    def load_user_configs(self) -> bool:
        """Load user configurations (REQUIRED before using other endpoints)."""
        try:
            response = requests.get(
                f"{self.base_url}/user/configs",
                headers=self.headers
            )
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"Error loading user configurations: {e}")
            return False

    def create_connection(self, config: ConnectionConfig) -> Optional[Dict[str, Any]]:
        """Create a new database connection."""
        try:
            response = requests.post(
                f"{self.base_url}/connections",
                headers=self.headers,
                json=config.__dict__
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error creating connection: {e}")
            return None

    def test_connection(self, conn_id: str) -> bool:
        """Test if a connection is working."""
        try:
            response = requests.post(
                f"{self.base_url}/connections/{conn_id}/ping",
                headers=self.headers
            )
            response.raise_for_status()
            result = response.json()
            return result.get('ping') == 'ok'
        except requests.exceptions.RequestException as e:
            print(f"Error testing connection: {e}")
            return False

    def create_stream(self, config: StreamConfig) -> Optional[Dict[str, Any]]:
        """Create a new stream configuration."""
        try:
            response = requests.post(
                f"{self.base_url}/stream-configs",
                headers=self.headers,
                json=config.__dict__
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error creating stream: {e}")
            return None

# Usage example showing the complete workflow
def main():
    api = DBConvertAPI("your_api_key_here")
    
    # Load user configurations (REQUIRED)
    if not api.load_user_configs():
        print("Failed to load user configurations")
        return

    # 1. Create source MySQL connection
    source_config = ConnectionConfig(
        name="mysql-source",
        type="mysql",
        host="localhost",
        port=3306,
        username="root",
        password="your_password",
        database="source_db"
    )
    
    source_conn = api.create_connection(source_config)
    if not source_conn:
        print("Failed to create source connection")
        return
    
    # 2. Create target PostgreSQL connection
    target_config = ConnectionConfig(
        name="postgres-target",
        type="postgresql",
        host="localhost",
        port=5432,
        username="postgres",
        password="your_password",
        database="target_db"
    )
    
    target_conn = api.create_connection(target_config)
    if not target_conn:
        print("Failed to create target connection")
        return
    
    # 3. Test both connections
    if not api.test_connection(source_conn['id']):
        print("Source connection test failed")
        return
        
    if not api.test_connection(target_conn['id']):
        print("Target connection test failed")
        return
    
    # 4. Create stream using connection IDs
    stream_config = StreamConfig(
        name="mysql_to_postgresql",
        source=source_conn['id'],  # Use source connection ID
        target=target_conn['id'],  # Use target connection ID
        mode="cdc",
        tables=[{
            "name": "users",
            "operations": ["insert", "update", "delete"]
        }],
        data_bundle_size=40,
        reporting_intervals={"source": 3, "target": 3},
        create_structure=True
    )
    
    stream = api.create_stream(stream_config)
    if stream:
        print(f"Successfully created stream: {stream['id']}")

if __name__ == "__main__":
    main()

Using Node.js

A TypeScript-friendly Node.js implementation with async/await:

typescript
import axios, { AxiosInstance } from 'axios';

interface StreamConfig {
    name: string;
    source: string;
    target: string;
    mode: 'convert' | 'cdc';
    dataBundleSize?: number;
    reportingIntervals?: {
        source: number;
        target: number;
    };
    tables: Array<{
        name: string;
        operations?: Array<'insert' | 'update' | 'delete'>;
    }>;
    createStructure?: boolean;
}

interface StreamResponse {
    id: string;
    name: string;
    status: 'READY' | 'RUNNING' | 'PAUSED' | 'STOPPED' | 'FINISHED' | 'FAILED';
    created?: number;
    started?: number;
    paused?: number;
    resumed?: number;
    stopped?: number;
}

interface StreamStats {
    streamID: string;
    source: NodeStats;
    target: NodeStats;
}

interface NodeStats {
    counter: number;
    failedCounter: number;
    prevDataSize: number;
    sumDataSize: number;
    reportingInterval: number;
    start: string;
    duration: string;
    avgRate: number;
    status: string;
}

class DBConvertStreamsAPI {
    private client: AxiosInstance;

    constructor(apiKey: string, baseURL: string = 'http://127.0.0.1:8020/api/v1') {
        this.client = axios.create({
            baseURL,
            headers: {
                'X-API-Key': apiKey,
                'Content-Type': 'application/json'
            }
        });
    }

    async loadUserConfigs(): Promise<boolean> {
        try {
            await this.client.get('/user/configs');
            return true;
        } catch (error) {
            console.error('Error loading user configurations:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async createStreamConfig(config: StreamConfig): Promise<StreamResponse> {
        try {
            const { data } = await this.client.post('/stream-configs', config);
            return data;
        } catch (error) {
            console.error('Error creating stream configuration:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async startStream(streamId: string): Promise<StreamResponse> {
        try {
            const { data } = await this.client.post(`/streams/${streamId}/start`);
            return data;
        } catch (error) {
            console.error('Error starting stream:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async getStreamStats(streamId: string): Promise<StreamStats> {
        try {
            const { data } = await this.client.get(`/streams/${streamId}/stats`);
            return data;
        } catch (error) {
            console.error('Error getting stream statistics:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async pauseStream(streamId: string): Promise<StreamResponse> {
        try {
            const { data } = await this.client.post(`/streams/${streamId}/pause`);
            return data;
        } catch (error) {
            console.error('Error pausing stream:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async resumeStream(streamId: string): Promise<StreamResponse> {
        try {
            const { data } = await this.client.post(`/streams/${streamId}/resume`);
            return data;
        } catch (error) {
            console.error('Error resuming stream:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async stopStream(streamId: string): Promise<StreamResponse> {
        try {
            const { data } = await this.client.post(`/streams/${streamId}/stop`);
            return data;
        } catch (error) {
            console.error('Error stopping stream:', 
                         error.response?.data || error.message);
            throw error;
        }
    }
}

// Usage example
async function main() {
    const api = new DBConvertStreamsAPI('your_api_key_here');

    try {
        // Load user configurations (REQUIRED)
        await api.loadUserConfigs();

        // Create a stream configuration
        const streamConfig: StreamConfig = {
            name: 'mysql_to_postgresql',
            source: 'conn_source_id',
            target: 'conn_target_id',
            mode: 'cdc',
            dataBundleSize: 40,
            reportingIntervals: {
                source: 3,
                target: 3
            },
            tables: [
                {
                    name: 'users',
                    operations: ['insert', 'update', 'delete']
                }
            ],
            createStructure: true
        };

        // Create and start the stream
        const newStream = await api.createStreamConfig(streamConfig);
        console.log('Created stream:', newStream.id);

        // Start the stream
        const startResult = await api.startStream(newStream.id);
        console.log('Stream started:', startResult.status);

        // Monitor stream statistics
        const stats = await api.getStreamStats(newStream.id);
        console.log('Processed records:', stats.source.counter);

        // Pause the stream
        const pauseResult = await api.pauseStream(newStream.id);
        console.log('Stream paused:', pauseResult.status);

        // Resume the stream
        const resumeResult = await api.resumeStream(newStream.id);
        console.log('Stream resumed:', resumeResult.status);
    } catch (error) {
        console.error('Operation failed:', error);
    }
}

main();

Using PowerShell

A PowerShell class implementation with strong typing:

powershell
class DBConvertStreamsAPI {
    [string]$BaseURL
    [hashtable]$Headers

    DBConvertStreamsAPI([string]$ApiKey, [string]$BaseURL = "http://127.0.0.1:8020/api/v1") {
        $this.BaseURL = $BaseURL
        $this.Headers = @{
            "X-API-Key" = $ApiKey
            "Content-Type" = "application/json"
        }
    }

    [bool] LoadUserConfigs() {
        try {
            Invoke-RestMethod `
                -Uri "$($this.BaseURL)/user/configs" `
                -Headers $this.Headers `
                -Method Get
            return $true
        }
        catch {
            Write-Error "Error loading user configurations: $_"
            return $false
        }
    }

    [object] CreateStreamConfig([hashtable]$ConfigData) {
        try {
            $body = $ConfigData | ConvertTo-Json -Depth 10
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/stream-configs" `
                -Headers $this.Headers `
                -Method Post `
                -Body $body
            return $response
        }
        catch {
            Write-Error "Error creating stream configuration: $_"
            return $null
        }
    }

    [object] StartStream([string]$StreamId) {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/streams/$StreamId/start" `
                -Headers $this.Headers `
                -Method Post
            return $response
        }
        catch {
            Write-Error "Error starting stream: $_"
            return $null
        }
    }

    [object] GetStreamStats([string]$StreamId) {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/streams/$StreamId/stats" `
                -Headers $this.Headers `
                -Method Get
            return $response
        }
        catch {
            Write-Error "Error getting stream statistics: $_"
            return $null
        }
    }

    [object] PauseStream([string]$StreamId) {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/streams/$StreamId/pause" `
                -Headers $this.Headers `
                -Method Post
            return $response
        }
        catch {
            Write-Error "Error pausing stream: $_"
            return $null
        }
    }

    [object] ResumeStream([string]$StreamId) {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/streams/$StreamId/resume" `
                -Headers $this.Headers `
                -Method Post
            return $response
        }
        catch {
            Write-Error "Error resuming stream: $_"
            return $null
        }
    }

    [object] StopStream([string]$StreamId) {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/streams/$StreamId/stop" `
                -Headers $this.Headers `
                -Method Post
            return $response
        }
        catch {
            Write-Error "Error stopping stream: $_"
            return $null
        }
    }
}

# Usage example
$api = [DBConvertStreamsAPI]::new("your_api_key_here")

# Load user configurations (REQUIRED)
if (-not $api.LoadUserConfigs()) {
    Write-Error "Failed to load user configurations"
    return
}

# Create a stream configuration
$streamConfig = @{
    name = "mysql_to_postgresql"
    source = "conn_source_id"
    target = "conn_target_id"
    mode = "cdc"
    dataBundleSize = 40
    reportingIntervals = @{
        source = 3
        target = 3
    }
    tables = @(
        @{
            name = "users"
            operations = @("insert", "update", "delete")
        }
    )
    createStructure = $true
}

# Create and start the stream
$newStream = $api.CreateStreamConfig($streamConfig)
if ($newStream) {
    Write-Host "Created stream: $($newStream.id)"
    
    # Start the stream
    $startResult = $api.StartStream($newStream.id)
    if ($startResult) {
        Write-Host "Stream started: $($startResult.status)"
        
        # Monitor stream statistics
        $stats = $api.GetStreamStats($newStream.id)
        if ($stats) {
            Write-Host "Processed records: $($stats.source.counter)"
            
            # Pause the stream
            $pauseResult = $api.PauseStream($newStream.id)
            if ($pauseResult) {
                Write-Host "Stream paused: $($pauseResult.status)"
                
                # Resume the stream
                $resumeResult = $api.ResumeStream($newStream.id)
                if ($resumeResult) {
                    Write-Host "Stream resumed: $($resumeResult.status)"
                }
            }
        }
    }
}

Error Handling

Each example includes comprehensive error handling:

  • All examples catch and handle network errors
  • Response status codes are checked
  • Error messages are logged with details
  • Type safety is enforced where possible

Common error scenarios:

  • 401: Invalid or missing API key
  • 404: Stream not found
  • 409: Cannot start stream (e.g., already running)
  • 503: Service unavailable

Next Steps

DBConvert Streams - event driven replication for databases