Java Code: Apex Bulk Loading MySQL

Creating a bulk sync between MySQL and Salesforce using the Salesforce Bulk API. I created a Bulk Loader that connects to MySQL exports data to CSV and imports it into Salesforce. In this example I am syncing the Lead object.

package com.thysmichels;

import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;

import au.com.bytecode.opencsv.CSVWriter;

import com.sforce.async.*;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;

public class BulkLoader {

// Salesforce.com credentials
  private String userName = "yourusername";
  private String password = "yourpasswordandsecuritytoken";
  // the sObject being uploaded
  private String sObject = "Lead";
  // the CSV file being uploaded - either manually or from MySQL resultset
  // ie /Users/Me/Desktop/BulkAPI/myfile.csv
  private String csvFileName = "/Users/tmichels/Documents/lead.csv";
  // MySQL connection URL
  String mySqlUrl = "jdbc:mysql://localhost/RLSFDCArchive?user=admin&password=admin";
  // the query that returns results from MySQL
  String mySqlQuery = "SELECT LASTNAME, COMPANY, STATUS, EMAIL FROM RLSFDCArchive.LEAD limit 10";

  private BufferedReader console = null;

  public static void main(String[] args) throws AsyncApiException, ConnectionException, IOException 
  {
    BulkLoader example = new BulkLoader();
    example.run();
  }

  public void run() throws AsyncApiException, ConnectionException, IOException
  {
          if (createCSVFromMySQL()) 
          {
            System.out.println("Submitting CSV file to Salesforce.com");
            runJob(sObject, userName, password, csvFileName);
          }
}
  private boolean createCSVFromMySQL() {

    System.out.println("Fetching records from MySQL");

    Connection conn = null;
    ResultSet rs = null;
    boolean success = false;

    try {
      Class.forName("com.mysql.jdbc.Driver").newInstance();
      conn = DriverManager.getConnection(mySqlUrl);

      Statement s = conn.createStatement ();
      s.executeQuery(mySqlQuery);
      rs = s.getResultSet();

      // dump the contents to the console
      //System.out.println(rs.getMetaData().getColumnName(1));

    /* while (rs.next ()){
        String LnVal = rs.getString ("LASTNAME");
        String CompanyVal = rs.getString ("COMPANY");
        System.out.println ("LastName = "+LnVal+", Company = "+CompanyVal);           
      }   */

      // write the result set to the CSV file
      if (rs != null) {
        CSVWriter writer = new CSVWriter(new FileWriter(csvFileName), ',');
        writer.writeAll(rs, true);
        writer.close();
        System.out.println("Successfully fetched records from MySQL");
        success = true;
      }

    } catch (Exception e) {
      System.err.println("Cannot connect to database server");
      success = false;
    } finally {
      if (rs != null) {
        try {
          rs.close();
          System.out.println("Resultset terminated");
        } catch (Exception e1) { /* ignore close errors */
        }
      }
      if (conn != null) {
        try {
          conn.close();
          System.out.println("Database connection terminated");
        } catch (Exception e2) { /* ignore close errors */
        }
      }

    }
    return success;

  }

  public void runJob(String sobjectType, String userName, String password,
      String sampleFileName) throws AsyncApiException, ConnectionException,
      IOException 
{
  BulkConnection connection = getBulkConnection(userName, password);
    JobInfo job = createJob(sobjectType, connection);
    List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job, sampleFileName);
    closeJob(connection, job.getId());
    awaitCompletion(connection, job, batchInfoList);
    checkResults(connection, job, batchInfoList); 
     }

  private void awaitCompletion(BulkConnection connection, JobInfo job,
      List<BatchInfo> batchInfoList) throws AsyncApiException {
    long sleepTime = 0L;
    Set<String> incomplete = new HashSet<String>();
    for (BatchInfo bi : batchInfoList) {
      incomplete.add(bi.getId());
    }
    while (!incomplete.isEmpty()) {
      try {
        Thread.sleep(sleepTime);
      } catch (InterruptedException e) {
      }
      System.out.println("Awaiting results..." + incomplete.size());
      sleepTime = 10000L;
      BatchInfo[] statusList = connection.getBatchInfoList(job.getId())
          .getBatchInfo();
      for (BatchInfo b : statusList) {
        if (b.getState() == BatchStateEnum.Completed
            || b.getState() == BatchStateEnum.Failed) {
          if (incomplete.remove(b.getId())) {
            System.out.println("BATCH STATUS:\n" + b);
          }
        }
      }
    }
  }

  private void checkResults(BulkConnection connection, JobInfo job,
      List<BatchInfo> batchInfoList) throws AsyncApiException, IOException {
    // batchInfoList was populated when batches were created and submitted
    for (BatchInfo b : batchInfoList) {
      CSVReader rdr = new CSVReader(connection.getBatchResultStream(
          job.getId(), b.getId()));
      List<String> resultHeader = rdr.nextRecord();
      int resultCols = resultHeader.size();

      List<String> row;
      while ((row = rdr.nextRecord()) != null) {
        Map<String, String> resultInfo = new HashMap<String, String>();
        for (int i = 0; i < resultCols; i++) {
          resultInfo.put(resultHeader.get(i), row.get(i));
        }
        boolean success = Boolean.valueOf(resultInfo.get("Success"));
        boolean created = Boolean.valueOf(resultInfo.get("Created"));
        String id = resultInfo.get("Id");
        String error = resultInfo.get("Error");
        if (success && created) {
          System.out.println("Created row with id " + id);
        } else if (!success) {
          System.out.println("Failed with error: " + error);
        }
      }
    }
  }

  private void closeJob(BulkConnection connection, String jobId)
      throws AsyncApiException {
    JobInfo job = new JobInfo();
    job.setId(jobId);
    job.setState(JobStateEnum.Closed);
    connection.updateJob(job);
  }

  private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
      JobInfo jobInfo, String csvFileName) throws IOException,
      AsyncApiException {
    List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
    BufferedReader rdr = new BufferedReader(new InputStreamReader(
        new FileInputStream(csvFileName)));
    // read the CSV header row
    byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
    int headerBytesLength = headerBytes.length;
    File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");

    // Split the CSV file into multiple batches
    try {
      FileOutputStream tmpOut = new FileOutputStream(tmpFile);
      int maxBytesPerBatch = 10000000; // 10 million bytes per batch
      int maxRowsPerBatch = 10000; // 10 thousand rows per batch
      int currentBytes = 0;
      int currentLines = 0;
      String nextLine;
      while ((nextLine = rdr.readLine()) != null) {
        byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
        // Create a new batch when our batch size limit is reached
        if (currentBytes + bytes.length > maxBytesPerBatch
            || currentLines > maxRowsPerBatch) {
          createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
          currentBytes = 0;
          currentLines = 0;
        }
        if (currentBytes == 0) {
          tmpOut = new FileOutputStream(tmpFile);
          tmpOut.write(headerBytes);
          currentBytes = headerBytesLength;
          currentLines = 1;
        }
        tmpOut.write(bytes);
        currentBytes += bytes.length;
        currentLines++;
      }
      // Finished processing all rows
      // Create a final batch for any remaining data
      if (currentLines > 1) {
        createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
      }
    } finally {
      tmpFile.delete();
    }
    return batchInfos;
  }

  private void createBatch(FileOutputStream tmpOut, File tmpFile,
      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
      throws IOException, AsyncApiException {
    tmpOut.flush();
    tmpOut.close();
    FileInputStream tmpInputStream = new FileInputStream(tmpFile);
    try {
      BatchInfo batchInfo = connection.createBatchFromStream(jobInfo,
          tmpInputStream);
      System.out.println(batchInfo);
      batchInfos.add(batchInfo);

    } finally {
      tmpInputStream.close();
    }
  }

  private BulkConnection getBulkConnection(String userName, String password)
      throws ConnectionException, AsyncApiException {
    ConnectorConfig partnerConfig = new ConnectorConfig();
    partnerConfig.setUsername(userName);
    partnerConfig.setPassword(password);
    partnerConfig.setAuthEndpoint("https://www.salesforce.com/services/Soap/u/25.0");
    // Creating the connection automatically handles login and stores
    // the session in partnerConfig
    new PartnerConnection(partnerConfig);
    // When PartnerConnection is instantiated, a login is implicitly
    // executed and, if successful,
    // a valid session is stored in the ConnectorConfig instance.
    // Use this key to initialize a RestConnection:
    ConnectorConfig config = new ConnectorConfig();
    config.setSessionId(partnerConfig.getSessionId());
    // The endpoint for the Bulk API service is the same as for the normal
    // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
    String soapEndpoint = partnerConfig.getServiceEndpoint();
    String apiVersion = "25.0";
    String restEndpoint = soapEndpoint.substring(0, soapEndpoint
        .indexOf("Soap/"))
        + "async/" + apiVersion;
    config.setRestEndpoint(restEndpoint);
    // This should only be false when doing debugging.
    config.setCompression(true);
    // Set this to true to see HTTP requests and responses on stdout
    config.setTraceMessage(false);
    BulkConnection connection = new BulkConnection(config);
    return connection;
  }

  private JobInfo createJob(String sobjectType, BulkConnection connection)
      throws AsyncApiException 
      {
    JobInfo job = new JobInfo();
    job.setObject(sobjectType);
    job.setOperation(OperationEnum.insert);
    job.setContentType(ContentType.CSV);
    job = connection.createJob(job);
    System.out.println(job);
    return job;
      }

}

Below is the output expected:

Fetching records from MySQL
Successfully fetched records from MySQL
Resultset terminated
Database connection terminated
Submitting CSV file to Salesforce.com
[JobInfo  id='75060000000LtROAA0'
 operation='insert'
 object='Lead'
 createdById='00560000001mciHAAQ'
 createdDate='java.util.GregorianCalendar[time=1357172058000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=18,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]'
 systemModstamp='java.util.GregorianCalendar[time=1357172058000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=18,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]'
 state='Open'
 externalIdFieldName='null'
 concurrencyMode='Parallel'
 contentType='CSV'
 numberBatchesQueued='0'
 numberBatchesInProgress='0'
 numberBatchesCompleted='0'
 numberBatchesFailed='0'
 numberBatchesTotal='0'
 numberRecordsProcessed='0'
 numberRetries='0'
 apiVersion='25.0'
 assignmentRuleId='null'
 numberRecordsFailed='0'
 totalProcessingTime='0'
 apiActiveProcessingTime='0'
 apexProcessingTime='0'
]

[BatchInfo  id='75160000000sEGGAA2'
 jobId='75060000000LtROAA0'
 state='Queued'
 stateMessage='null'
 createdDate='java.util.GregorianCalendar[time=1357172059000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=19,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]'
 systemModstamp='java.util.GregorianCalendar[time=1357172059000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=19,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]'
 numberRecordsProcessed='0'
 numberRecordsFailed='0'
 totalProcessingTime='0'
 apiActiveProcessingTime='0'
 apexProcessingTime='0'
]

Awaiting results...1
Awaiting results...1
BATCH STATUS:
[BatchInfo  id='75160000000sEGGAA2'
 jobId='75060000000LtROAA0'
 state='Completed'
 stateMessage='null'
 createdDate='java.util.GregorianCalendar[time=1357172059000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=19,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]'
 systemModstamp='java.util.GregorianCalendar[time=1357172060000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=20,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]'
 numberRecordsProcessed='10'
 numberRecordsFailed='1'
 totalProcessingTime='753'
 apiActiveProcessingTime='648'
 apexProcessingTime='517'
]

1 Comment

  1. Osvald says:

    Thank you so much, You did a really amazing job.

    BTW, I am working as the developer in Skyvia. The service that allows you to make synchronization between Salesforce and MySQL just in few clicks. Also, it is a completely no-coding solution.

    If it sounds interesting to you, just follow the link: https://skyvia.com/data-integration/integrate-salesforce-mysq

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s