Creating a bulk sync between MySQL and Salesforce using the Salesforce Bulk API. I created a Bulk Loader that connects to MySQL exports data to CSV and imports it into Salesforce. In this example I am syncing the Lead object.
package com.thysmichels; import java.io.*; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.*; import au.com.bytecode.opencsv.CSVWriter; import com.sforce.async.*; import com.sforce.soap.partner.PartnerConnection; import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; public class BulkLoader { // Salesforce.com credentials private String userName = "yourusername"; private String password = "yourpasswordandsecuritytoken"; // the sObject being uploaded private String sObject = "Lead"; // the CSV file being uploaded - either manually or from MySQL resultset // ie /Users/Me/Desktop/BulkAPI/myfile.csv private String csvFileName = "/Users/tmichels/Documents/lead.csv"; // MySQL connection URL String mySqlUrl = "jdbc:mysql://localhost/RLSFDCArchive?user=admin&password=admin"; // the query that returns results from MySQL String mySqlQuery = "SELECT LASTNAME, COMPANY, STATUS, EMAIL FROM RLSFDCArchive.LEAD limit 10"; private BufferedReader console = null; public static void main(String[] args) throws AsyncApiException, ConnectionException, IOException { BulkLoader example = new BulkLoader(); example.run(); } public void run() throws AsyncApiException, ConnectionException, IOException { if (createCSVFromMySQL()) { System.out.println("Submitting CSV file to Salesforce.com"); runJob(sObject, userName, password, csvFileName); } } private boolean createCSVFromMySQL() { System.out.println("Fetching records from MySQL"); Connection conn = null; ResultSet rs = null; boolean success = false; try { Class.forName("com.mysql.jdbc.Driver").newInstance(); conn = DriverManager.getConnection(mySqlUrl); Statement s = conn.createStatement (); s.executeQuery(mySqlQuery); rs = s.getResultSet(); // dump the contents to the console //System.out.println(rs.getMetaData().getColumnName(1)); /* while (rs.next ()){ String LnVal = rs.getString ("LASTNAME"); String CompanyVal = rs.getString ("COMPANY"); System.out.println ("LastName = "+LnVal+", Company = "+CompanyVal); } */ // write the result set to the CSV file if (rs != null) { CSVWriter writer = new CSVWriter(new FileWriter(csvFileName), ','); writer.writeAll(rs, true); writer.close(); System.out.println("Successfully fetched records from MySQL"); success = true; } } catch (Exception e) { System.err.println("Cannot connect to database server"); success = false; } finally { if (rs != null) { try { rs.close(); System.out.println("Resultset terminated"); } catch (Exception e1) { /* ignore close errors */ } } if (conn != null) { try { conn.close(); System.out.println("Database connection terminated"); } catch (Exception e2) { /* ignore close errors */ } } } return success; } public void runJob(String sobjectType, String userName, String password, String sampleFileName) throws AsyncApiException, ConnectionException, IOException { BulkConnection connection = getBulkConnection(userName, password); JobInfo job = createJob(sobjectType, connection); List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job, sampleFileName); closeJob(connection, job.getId()); awaitCompletion(connection, job, batchInfoList); checkResults(connection, job, batchInfoList); } private void awaitCompletion(BulkConnection connection, JobInfo job, List<BatchInfo> batchInfoList) throws AsyncApiException { long sleepTime = 0L; Set<String> incomplete = new HashSet<String>(); for (BatchInfo bi : batchInfoList) { incomplete.add(bi.getId()); } while (!incomplete.isEmpty()) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { } System.out.println("Awaiting results..." + incomplete.size()); sleepTime = 10000L; BatchInfo[] statusList = connection.getBatchInfoList(job.getId()) .getBatchInfo(); for (BatchInfo b : statusList) { if (b.getState() == BatchStateEnum.Completed || b.getState() == BatchStateEnum.Failed) { if (incomplete.remove(b.getId())) { System.out.println("BATCH STATUS:\n" + b); } } } } } private void checkResults(BulkConnection connection, JobInfo job, List<BatchInfo> batchInfoList) throws AsyncApiException, IOException { // batchInfoList was populated when batches were created and submitted for (BatchInfo b : batchInfoList) { CSVReader rdr = new CSVReader(connection.getBatchResultStream( job.getId(), b.getId())); List<String> resultHeader = rdr.nextRecord(); int resultCols = resultHeader.size(); List<String> row; while ((row = rdr.nextRecord()) != null) { Map<String, String> resultInfo = new HashMap<String, String>(); for (int i = 0; i < resultCols; i++) { resultInfo.put(resultHeader.get(i), row.get(i)); } boolean success = Boolean.valueOf(resultInfo.get("Success")); boolean created = Boolean.valueOf(resultInfo.get("Created")); String id = resultInfo.get("Id"); String error = resultInfo.get("Error"); if (success && created) { System.out.println("Created row with id " + id); } else if (!success) { System.out.println("Failed with error: " + error); } } } } private void closeJob(BulkConnection connection, String jobId) throws AsyncApiException { JobInfo job = new JobInfo(); job.setId(jobId); job.setState(JobStateEnum.Closed); connection.updateJob(job); } private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection, JobInfo jobInfo, String csvFileName) throws IOException, AsyncApiException { List<BatchInfo> batchInfos = new ArrayList<BatchInfo>(); BufferedReader rdr = new BufferedReader(new InputStreamReader( new FileInputStream(csvFileName))); // read the CSV header row byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8"); int headerBytesLength = headerBytes.length; File tmpFile = File.createTempFile("bulkAPIInsert", ".csv"); // Split the CSV file into multiple batches try { FileOutputStream tmpOut = new FileOutputStream(tmpFile); int maxBytesPerBatch = 10000000; // 10 million bytes per batch int maxRowsPerBatch = 10000; // 10 thousand rows per batch int currentBytes = 0; int currentLines = 0; String nextLine; while ((nextLine = rdr.readLine()) != null) { byte[] bytes = (nextLine + "\n").getBytes("UTF-8"); // Create a new batch when our batch size limit is reached if (currentBytes + bytes.length > maxBytesPerBatch || currentLines > maxRowsPerBatch) { createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo); currentBytes = 0; currentLines = 0; } if (currentBytes == 0) { tmpOut = new FileOutputStream(tmpFile); tmpOut.write(headerBytes); currentBytes = headerBytesLength; currentLines = 1; } tmpOut.write(bytes); currentBytes += bytes.length; currentLines++; } // Finished processing all rows // Create a final batch for any remaining data if (currentLines > 1) { createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo); } } finally { tmpFile.delete(); } return batchInfos; } private void createBatch(FileOutputStream tmpOut, File tmpFile, List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo) throws IOException, AsyncApiException { tmpOut.flush(); tmpOut.close(); FileInputStream tmpInputStream = new FileInputStream(tmpFile); try { BatchInfo batchInfo = connection.createBatchFromStream(jobInfo, tmpInputStream); System.out.println(batchInfo); batchInfos.add(batchInfo); } finally { tmpInputStream.close(); } } private BulkConnection getBulkConnection(String userName, String password) throws ConnectionException, AsyncApiException { ConnectorConfig partnerConfig = new ConnectorConfig(); partnerConfig.setUsername(userName); partnerConfig.setPassword(password); partnerConfig.setAuthEndpoint("https://www.salesforce.com/services/Soap/u/25.0"); // Creating the connection automatically handles login and stores // the session in partnerConfig new PartnerConnection(partnerConfig); // When PartnerConnection is instantiated, a login is implicitly // executed and, if successful, // a valid session is stored in the ConnectorConfig instance. // Use this key to initialize a RestConnection: ConnectorConfig config = new ConnectorConfig(); config.setSessionId(partnerConfig.getSessionId()); // The endpoint for the Bulk API service is the same as for the normal // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber' String soapEndpoint = partnerConfig.getServiceEndpoint(); String apiVersion = "25.0"; String restEndpoint = soapEndpoint.substring(0, soapEndpoint .indexOf("Soap/")) + "async/" + apiVersion; config.setRestEndpoint(restEndpoint); // This should only be false when doing debugging. config.setCompression(true); // Set this to true to see HTTP requests and responses on stdout config.setTraceMessage(false); BulkConnection connection = new BulkConnection(config); return connection; } private JobInfo createJob(String sobjectType, BulkConnection connection) throws AsyncApiException { JobInfo job = new JobInfo(); job.setObject(sobjectType); job.setOperation(OperationEnum.insert); job.setContentType(ContentType.CSV); job = connection.createJob(job); System.out.println(job); return job; } }
Below is the output expected:
Fetching records from MySQL Successfully fetched records from MySQL Resultset terminated Database connection terminated Submitting CSV file to Salesforce.com [JobInfo id='75060000000LtROAA0' operation='insert' object='Lead' createdById='00560000001mciHAAQ' createdDate='java.util.GregorianCalendar[time=1357172058000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=18,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]' systemModstamp='java.util.GregorianCalendar[time=1357172058000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=18,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]' state='Open' externalIdFieldName='null' concurrencyMode='Parallel' contentType='CSV' numberBatchesQueued='0' numberBatchesInProgress='0' numberBatchesCompleted='0' numberBatchesFailed='0' numberBatchesTotal='0' numberRecordsProcessed='0' numberRetries='0' apiVersion='25.0' assignmentRuleId='null' numberRecordsFailed='0' totalProcessingTime='0' apiActiveProcessingTime='0' apexProcessingTime='0' ] [BatchInfo id='75160000000sEGGAA2' jobId='75060000000LtROAA0' state='Queued' stateMessage='null' createdDate='java.util.GregorianCalendar[time=1357172059000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=19,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]' systemModstamp='java.util.GregorianCalendar[time=1357172059000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=19,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]' numberRecordsProcessed='0' numberRecordsFailed='0' totalProcessingTime='0' apiActiveProcessingTime='0' apexProcessingTime='0' ] Awaiting results...1 Awaiting results...1 BATCH STATUS: [BatchInfo id='75160000000sEGGAA2' jobId='75060000000LtROAA0' state='Completed' stateMessage='null' createdDate='java.util.GregorianCalendar[time=1357172059000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=19,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]' systemModstamp='java.util.GregorianCalendar[time=1357172060000,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="GMT",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2013,MONTH=0,WEEK_OF_YEAR=1,WEEK_OF_MONTH=1,DAY_OF_MONTH=3,DAY_OF_YEAR=3,DAY_OF_WEEK=5,DAY_OF_WEEK_IN_MONTH=1,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=14,SECOND=20,MILLISECOND=0,ZONE_OFFSET=0,DST_OFFSET=0]' numberRecordsProcessed='10' numberRecordsFailed='1' totalProcessingTime='753' apiActiveProcessingTime='648' apexProcessingTime='517' ]
Thank you so much, You did a really amazing job.
BTW, I am working as the developer in Skyvia. The service that allows you to make synchronization between Salesforce and MySQL just in few clicks. Also, it is a completely no-coding solution.
If it sounds interesting to you, just follow the link: https://skyvia.com/data-integration/integrate-salesforce-mysq