Skip to content

Getting Started

This guide walks you through setting up the ADP Sync Agent with dependency injection and building your first data synchronization pipeline.

Installation

Install the NuGet package:

dotnet add package ShiftSoftware.ADP.SyncAgent

Service Registration

The Sync Agent provides extension methods on IServiceCollection to register its components:

services.AddSyncService();                  // Registers SyncEngine<T> and SyncEngine<TSource, TDestination>
services.AddEFCoreSyncDataSource();         // Registers EFCoreSyncDataSource adapters
services.AddEFCoreSyncDataDestination();    // Registers EFCoreSyncDataDestination adapters
services.AddCosmosSyncDataDestination();    // Registers CosmosSyncDataDestination adapters

// For CSV-based pipelines
services.AddCSVSyncDataSource<FileSystemStorageService>(options =>
{
    options.CompareWorkingDirectory = @"C:\temp\csv-compare";
    options.SourceBasePath = @"C:\data\incoming";
    options.DestinationBasePath = @"C:\data\synced";
});

Example 1: EF Core to Cosmos DB

A common scenario — reading from a SQL Server database and syncing to Azure Cosmos DB.

public class PartSyncJob
{
    private readonly IServiceProvider _services;
    private readonly ILogger<PartSyncJob> _logger;

    public PartSyncJob(IServiceProvider services, ILogger<PartSyncJob> logger)
    {
        _services = services;
        _logger = logger;
    }

    public async Task<bool> RunAsync()
    {
        // 1. Create a Sync Engine
        await using var engine = new SyncEngine<PartEntity, PartCosmosDoc>(_services);

        // 2. Configure the engine
        engine.Configure(
            batchSize: 500,
            maxRetryCount: 3,
            operationTimeoutInSeconds: 600);

        // 3. Register a logger
        engine.RegisterLogger(new SyncEngineILogger(_logger));

        // 4. Attach the EF Core source adapter
        var source = engine
            .SetDataAddapter<EFCoreSyncDataSource<PartEntity, AppDbContext>>(_services);

        source.Configure(new EFCoreSyncDataSourceConfigurations<PartEntity>
        {
            Query = (query, actionType) => query
                .Where(p => p.LastSyncedAt == null || p.LastModified > p.LastSyncedAt),
            EntityKey = p => p.Id,
            SyncTimestamp = p => p.LastSyncedAt,
        });

        // 5. Attach the Cosmos DB destination adapter
        var destination = engine
            .SetDataAddapter<CosmosSyncDataDestination<PartEntity, PartCosmosDoc, CosmosClient>>(_services);

        destination.Configure(new CosmosSyncDataDestinationConfigurations<PartEntity, PartCosmosDoc>
        {
            DatabaseId = "PartsDB",
            ContainerId = "Parts",
            PartitionKeyLevel1Expression = x => x.Region,
        });

        // 6. Set up mapping
        engine.SetupMapping(async (items, actionType) =>
        {
            return items?.Select(p => p is null ? null : new PartCosmosDoc
            {
                id = p.Id.ToString(),
                PartNumber = p.PartNumber,
                Description = p.Description,
                Price = p.Price,
                Region = p.Region,
            });
        });

        // 7. Run the pipeline
        return await engine.RunAsync();
    }
}

Example 2: CSV to Cosmos DB

Syncing data from CSV flat files to Azure Cosmos DB — common when integrating with dealer management systems that export data as files.

public class PriceSyncJob
{
    private readonly IServiceProvider _services;
    private readonly ILogger<PriceSyncJob> _logger;

    public PriceSyncJob(IServiceProvider services, ILogger<PriceSyncJob> logger)
    {
        _services = services;
        _logger = logger;
    }

    public async Task<bool> RunAsync()
    {
        await using var engine = new SyncEngine<PriceRecord, PriceCosmosDoc>(_services);

        engine.Configure(
            actionExecutionAndOrder: [SyncActionType.Delete, SyncActionType.Add],
            batchSize: 1000,
            maxRetryCount: 2,
            operationTimeoutInSeconds: 900);

        engine.RegisterLogger(new SyncEngineILogger(_logger));

        // Attach CSV source (CsvHelper-based)
        var source = engine
            .SetDataAddapter<CsvHelperCsvSyncDataSource<PriceRecord, PriceCosmosDoc>>(_services);

        source.Configure(new CSVSyncDataSourceConfigurations<PriceRecord>
        {
            CSVFileName = "prices.csv",
            SourceDirectory = "dealer-exports",
            DestinationDirectory = "synced",
            SkipReorderedLines = true,
            HasHeaderRecord = true,
        });

        // Attach Cosmos DB destination
        var destination = engine
            .SetDataAddapter<CosmosSyncDataDestination<PriceRecord, PriceCosmosDoc, CosmosClient>>(_services);

        destination.Configure(new CosmosSyncDataDestinationConfigurations<PriceRecord, PriceCosmosDoc>
        {
            DatabaseId = "PricingDB",
            ContainerId = "Prices",
            PartitionKeyLevel1Expression = x => x.Region,
        });

        // Mapping
        engine.SetupMapping(async (items, actionType) =>
        {
            return items?.Select(p => p is null ? null : new PriceCosmosDoc
            {
                id = p.PartNumber,
                PartNumber = p.PartNumber,
                Price = p.Price,
                Currency = p.Currency,
                Region = p.Region,
            });
        });

        return await engine.RunAsync();
    }
}

Example 3: EF Core to EF Core (Database to Database)

Syncing data between two SQL Server databases — useful for replicating data across environments or aggregating dealer data.

public class OrderSyncJob
{
    private readonly IServiceProvider _services;

    public OrderSyncJob(IServiceProvider services) => _services = services;

    public async Task<bool> RunAsync()
    {
        await using var engine = new SyncEngine<OrderEntity>(_services);

        engine.Configure(batchSize: 200, maxRetryCount: 1);

        // Source: read from DealerDbContext
        var source = engine
            .SetDataAddapter<EFCoreSyncDataSource<OrderEntity, DealerDbContext>>(_services);

        source.Configure(new EFCoreSyncDataSourceConfigurations<OrderEntity>
        {
            Query = (query, actionType) => query.Where(o => o.Status == "Completed"),
            EntityKey = o => o.OrderId,
        });

        // Destination: write to CentralDbContext
        var destination = engine
            .SetDataAddapter<EFCoreSyncDataDestination<OrderEntity, CentralDbContext>>(_services);

        destination.Configure(new EFCoreSyncDataDestinationConfigurations());

        // Same type — use identity mapping
        engine.SetupMapping(async (items, actionType) => items);

        return await engine.RunAsync();
    }
}

Example 4: EF Core to DuckDB

Syncing operational data into DuckDB for analytics and reporting.

public class AnalyticsSyncJob
{
    private readonly IServiceProvider _services;

    public AnalyticsSyncJob(IServiceProvider services) => _services = services;

    public async Task<bool> RunAsync()
    {
        await using var engine = new SyncEngine<SalesEntity, SalesAnalyticsRow>(_services);

        engine.Configure(batchSize: 5000);

        var source = engine
            .SetDataAddapter<EFCoreSyncDataSource<SalesEntity, SalesAnalyticsRow, SalesDbContext>>(_services);

        source.Configure(new EFCoreSyncDataSourceConfigurations<SalesEntity, SalesAnalyticsRow>
        {
            Query = (query, actionType) => query
                .Select(s => new SalesAnalyticsRow
                {
                    SaleId = s.Id,
                    DealerId = s.DealerId,
                    Amount = s.TotalAmount,
                    SaleDate = s.CreatedAt,
                }),
            EntityKey = s => s.Id,
        });

        var destination = engine
            .SetDataAddapter<DuckDBSyncDataDestination<SalesEntity, SalesAnalyticsRow, DuckDBConnection>>(_services);

        destination.Configure(new DuckDBSyncDataDestinationConfigurations<SalesEntity, SalesAnalyticsRow>
        {
            TableName = "Sales",
            PrimaryKey = x => x.SaleId,
        });

        engine.SetupMapping(async (items, actionType) => items);

        return await engine.RunAsync();
    }
}

Example 5: Using AutoMapper

The Sync Agent integrates with AutoMapper via the UseAutoMapper extension method:

engine.UseAutoMapper(mapper);

This replaces the need for a manual SetupMapping call. The mapper will be used to transform TSource items into TDestination items automatically.


Pipeline Triggers

The Sync Agent does not dictate how pipelines are triggered. You can integrate it with any scheduling or event system:

Trigger Example
Scheduled Jobs Hangfire, Azure Functions (Timer Trigger), Windows Services, cron jobs.
Event-Based Azure Service Bus messages, Azure Blob Storage events, database change feeds.
On-Demand API endpoints, manual UI triggers.

Tips

Source Before Destination

Always configure the source adapter before the destination adapter. The adapters chain their lifecycle hooks — the destination adapter may depend on hooks already set by the source.

Batch Size Tuning

Start with a moderate batch size (500–1000) and adjust based on your data volume and destination latency. Smaller batches give more granular retry but add overhead. Larger batches are more efficient but risk more data on failure.

Timeout Planning

Set operationTimeoutInSeconds generously for initial syncs or large datasets. For recurring syncs with small deltas, a shorter timeout helps detect stuck pipelines early.

Dispose the Engine

SyncEngine implements IAsyncDisposable. Use await using to ensure all resources (adapters, file handles, connections) are cleaned up after the pipeline completes.