Show TOC

Procedure documentationUsing Multi Threading Locate this document in the navigation structure

 

If you want to work with multi-threading in your JCo Client, you need the SessionReferenceProvider interface. You can create a simple session reference using this interface in the JCo.

Caution Caution

In a multi-thread environment, distribution of objects (for example, JCoTable objects) between different threads must be implemented carefully. Note that it is not possible to make multiple concurrent SAP calls for the same direct connection.

End of the caution.

The following example defines a MultiStepJob with multiple execution steps. The program starts a specified number of threads where two different jobs are used, one for stateless and one for stateful communication. StatelessMultiStepExample and StatefulMultiStepExample. Both call the same RFC function module, where StatefulMultiStepExample however uses JCoContext.begin and JCoContext..end to define a stateful connection (Setting Up Stateful Connections).

To execute a Stateful call across multiple steps, the MySessionReferenceProvider implementation of the SessionReferenceProvider interface is used.

Example

Multi Threading Scenario

Syntax Syntax

  1. import java.io.File;
  2. import java.io.FileOutputStream;
  3. import java.util.Collection;
  4. import java.util.Hashtable;
  5. import java.util.Properties;
  6. import java.util.concurrent.BlockingQueue;
  7. import java.util.concurrent.CountDownLatch;
  8. import java.util.concurrent.LinkedBlockingQueue;
  9. import java.util.concurrent.TimeUnit;
  10. import java.util.concurrent.atomic.AtomicInteger;
  11. import com.sap.conn.jco.JCoContext;
  12. import com.sap.conn.jco.JCoDestination;
  13. import com.sap.conn.jco.JCoDestinationManager;
  14. import com.sap.conn.jco.JCoException;
  15. import com.sap.conn.jco.JCoFunction;
  16. import com.sap.conn.jco.JCoFunctionTemplate;
  17. import com.sap.conn.jco.ext.DestinationDataProvider;
  18. import com.sap.conn.jco.ext.Environment;
  19. import com.sap.conn.jco.ext.JCoSessionReference;
  20. import com.sap.conn.jco.ext.SessionException;
  21. import com.sap.conn.jco.ext.SessionReferenceProvider;
  22. public class MultiThreadedExample
  23. {
  24.     static String DESTINATION_NAME1 = "ABAP_AS_WITHOUT_POOL";
  25.     static String DESTINATION_NAME2 = "ABAP_AS_WITH_POOL";
  26.     static
  27.     {
  28.         Properties connectProperties = new Properties();
  29.         connectProperties.setProperty(DestinationDataProvider.JCO_ASHOST, "binmain");
  30.         connectProperties.setProperty(DestinationDataProvider.JCO_SYSNR,  "53");
  31.         connectProperties.setProperty(DestinationDataProvider.JCO_CLIENT, "000");
  32.         connectProperties.setProperty(DestinationDataProvider.JCO_USER,   "JCOTEST");
  33.         connectProperties.setProperty(DestinationDataProvider.JCO_PASSWD, "JCOTEST");
  34.         connectProperties.setProperty(DestinationDataProvider.JCO_LANG,   "en");
  35.         createDataFile(DESTINATION_NAME1, "jcoDestination", connectProperties);
  36.         connectProperties.setProperty(DestinationDataProvider.JCO_POOL_CAPACITY, "3");
  37.         connectProperties.setProperty(DestinationDataProvider.JCO_PEAK_LIMIT,    "10");
  38.         createDataFile(DESTINATION_NAME2, "jcoDestination", connectProperties);
  39.     }
  40.     
  41.     static void createDataFile(String name, String suffix, Properties properties)
  42.     {
  43.         File cfg = new File(name+"."+suffix);
  44.         if(!cfg.exists())
  45.         {
  46.             try
  47.             {
  48.                 FileOutputStream fos = new FileOutputStream(cfg, false);
  49.                 properties.store(fos, "for tests only !");
  50.                 fos.close();
  51.             }
  52.             catch (Exception e)
  53.             {
  54.                 throw new RuntimeException("Unable to create the destination file " + cfg.getName(), e);
  55.             }
  56.         }
  57.     }
  58.     
  59.     static void createDestinationDataFile(String destinationName, Properties connectProperties)
  60.     {
  61.         File destCfg = new File(destinationName+".jcoDestination");
  62.         try
  63.         {
  64.             FileOutputStream fos = new FileOutputStream(destCfg, false);
  65.             connectProperties.store(fos, "for tests only !");
  66.             fos.close();
  67.         }
  68.         catch (Exception e)
  69.         {
  70.             throw new RuntimeException("Unable to create the destination files", e);
  71.         }
  72.     }
  73.     interface MultiStepJob
  74.     {
  75.         boolean isFinished();
  76.         public void runNextStep();
  77.         String getName();
  78.         public void cleanUp();
  79.     }
  80.     
  81.     static class StatelessMultiStepExample implements MultiStepJob
  82.     {
  83.         static AtomicInteger JOB_COUNT = new AtomicInteger(0); 
  84.         int jobID = JOB_COUNT.addAndGet(1);
  85.         int calls;
  86.         JCoDestination destination;
  87.         
  88.         int executedCalls = 0;
  89.         Exception ex = null;
  90.         int remoteCounter;
  91.         
  92.         StatelessMultiStepExample(JCoDestination destination, int calls) 
  93.         { 
  94.             this.calls = calls;
  95.             this.destination = destination;
  96.         }
  97.         
  98.         public boolean isFinished() { return executedCalls == calls || ex != null; }
  99.         public String getName() { return "stateless Job-"+jobID; }
  100.         
  101.         public void runNextStep()
  102.         {
  103.             try
  104.             {
  105.                 JCoFunction incrementCounter = incrementCounterTemplate.getFunction();
  106.                 incrementCounter.execute(destination);
  107.                 JCoFunction getCounter = getCounterTemplate.getFunction();
  108.                 executedCalls++;
  109.                 
  110.                 if(isFinished())
  111.                 {
  112.                     getCounter.execute(destination);
  113.                     remoteCounter = getCounter.getExportParameterList().getInt("GET_VALUE");
  114.                 }
  115.             }
  116.             catch(JCoException je)
  117.             {
  118.                 ex = je;
  119.             }
  120.             catch(RuntimeException re)
  121.             {
  122.                 ex = re;
  123.             }
  124.         }
  125.         public void cleanUp() 
  126.         {
  127.             StringBuilder sb = new StringBuilder("Task ").append(getName()).append(" is finished ");
  128.             if(ex!=null)
  129.                 sb.append("with exception ").append(ex.toString());
  130.             else
  131.                 sb.append("successful. Counter is ").append(remoteCounter);
  132.             System.out.println(sb.toString());
  133.         }
  134.         
  135.     }
  136.     
  137.     static class StatefulMultiStepExample extends StatelessMultiStepExample
  138.     {
  139.         StatefulMultiStepExample(JCoDestination destination, int calls) 
  140.         { 
  141.             super(destination, calls);
  142.             
  143.         }
  144.         @Override
  145.         public String getName() { return "stateful Job-"+jobID; }
  146.         
  147.         @Override
  148.         public void runNextStep()
  149.         {
  150.             if(executedCalls == 0)
  151.                 JCoContext.begin(destination);
  152.             super.runNextStep();
  153.         }
  154.         
  155.         @Override
  156.         public void cleanUp() 
  157.         {
  158.             try
  159.             {
  160.                 JCoContext.end(destination);
  161.             }
  162.             catch (JCoException je)
  163.             {
  164.                 ex = je;
  165.             }
  166.             super.cleanUp();
  167.         }
  168.     }
  169.     
  170.     static class MySessionReferenceProvider implements SessionReferenceProvider
  171.     {
  172.         public JCoSessionReference getCurrentSessionReference(String scopeType)
  173.         {
  174.             MySessionReference sesRef = WorkerThread.localSessionReference.get();
  175.             if(sesRef != null)
  176.                 return sesRef; 
  177.             
  178.             throw new RuntimeException("Unknown thread:" + Thread.currentThread().getId());
  179.         }
  180.         public boolean isSessionAlive(String sessionId)
  181.         {
  182.             Collection<MySessionReference> availableSessions = WorkerThread.sessions.values();
  183.             for(MySessionReference ref : availableSessions)
  184.             {
  185.                 if(ref.getID().equals(sessionId))
  186.                     return true;
  187.             }
  188.             return false;
  189.         }
  190.         public void jcoServerSessionContinued(String sessionID) throws SessionException
  191.         {
  192.         }
  193.         public void jcoServerSessionFinished(String sessionID)
  194.         {
  195.         }
  196.         public void jcoServerSessionPassivated(String sessionID) throws SessionException
  197.         {
  198.         }
  199.         public JCoSessionReference jcoServerSessionStarted() throws SessionException
  200.         {
  201.             return null;
  202.         }
  203.     }
  204.     
  205.     static class MySessionReference implements JCoSessionReference
  206.     {
  207.         static AtomicInteger atomicInt = new AtomicInteger(0);
  208.         private String id = "session-"+String.valueOf(atomicInt.addAndGet(1));;
  209.         
  210.         public void contextFinished()
  211.         {
  212.         }
  213.         public void contextStarted()
  214.         {
  215.         }
  216.         public String getID()
  217.         {
  218.             return id;
  219.         }
  220.         
  221.     }
  222.     
  223.     static class WorkerThread extends Thread
  224.     {
  225.         static Hashtable<MultiStepJob, MySessionReference> sessions = new Hashtable<MultiStepJob, MySessionReference>();
  226.         static ThreadLocal<MySessionReference> localSessionReference = new ThreadLocal<MySessionReference>();
  227.         
  228.         private CountDownLatch doneSignal;
  229.         WorkerThread(CountDownLatch doneSignal)
  230.         {
  231.             this.doneSignal = doneSignal;
  232.         }
  233.         
  234.         @Override
  235.         public void run()
  236.         {
  237.             try
  238.             {
  239.                 for(;;)
  240.                 {
  241.                     MultiStepJob job = queue.poll(10, TimeUnit.SECONDS);
  242.                     
  243.                     //stop if nothing to do
  244.                     if(job == null)
  245.                         return;
  246.                     MySessionReference sesRef = sessions.get(job);
  247.                     if(sesRef == null)
  248.                     {
  249.                         sesRef = new MySessionReference();
  250.                         sessions.put(job, sesRef);
  251.                     }
  252.                     localSessionReference.set(sesRef);
  253.                     
  254.                     System.out.println("Task "+job.getName()+" is started.");
  255.                     try
  256.                     {
  257.                         job.runNextStep();
  258.                     }
  259.                     catch (Throwable th)
  260.                     {
  261.                         th.printStackTrace();
  262.                     }
  263.                     if(job.isFinished())
  264.                     {
  265.                         System.out.println("Task "+job.getName()+" is finished.");
  266.                         sessions.remove(job);
  267.                         job.cleanUp();
  268.                     }
  269.                     else
  270.                     {
  271.                         System.out.println("Task "+job.getName()+" is passivated.");
  272.                         queue.add(job);
  273.                     }
  274.                     localSessionReference.set(null);
  275.                 }
  276.             }
  277.             catch (InterruptedException e)
  278.             {
  279.                 //just leave
  280.             }
  281.             finally
  282.             {
  283.                 doneSignal.countDown();
  284.             }
  285.         }
  286.     }
  287.     private static BlockingQueue<MultiStepJob> queue = new LinkedBlockingQueue<MultiStepJob>();
  288.     private static JCoFunctionTemplate incrementCounterTemplate, getCounterTemplate;
  289.     
  290.     static void runJobs(JCoDestination destination, int jobCount, int threadCount)
  291.     {
  292.         System.out.println(">>> Start");
  293.         for(int i = 0; i < jobCount; i++)
  294.         {
  295.             queue.add(new StatelessMultiStepExample(destination, 10));
  296.             queue.add(new StatefulMultiStepExample(destination, 10));
  297.         }
  298.         CountDownLatch doneSignal = new CountDownLatch(threadCount);
  299.         for(int i = 0; i < threadCount; i++)
  300.             new WorkerThread(doneSignal).start();
  301.         
  302.         System.out.print(">>> Wait ... ");
  303.         try
  304.         {
  305.             doneSignal.await();
  306.         }
  307.         catch (InterruptedException ie)
  308.         {
  309.             //just leave
  310.         }
  311.         System.out.println(">>> Done");
  312.     }
  313.     public static void main(String[] argv)
  314.     {
  315.         //JCo.setTrace(5, ".");
  316.         Environment.registerSessionReferenceProvider(new MySessionReferenceProvider());
  317.         try
  318.         {
  319.             JCoDestination destination = JCoDestinationManager.getDestination(DESTINATION_NAME2);
  320.             incrementCounterTemplate = destination.getRepository().getFunctionTemplate("Z_INCREMENT_COUNTER");
  321.             getCounterTemplate = destination.getRepository().getFunctionTemplate("Z_GET_COUNTER");
  322.             if(incrementCounterTemplate == null || getCounterTemplate == null)
  323.                 throw new RuntimeException("This example cannot run without Z_INCREMENT_COUNTER and Z_GET_COUNTER functions");
  324.             runJobs(destination, 5, 2);
  325.         }
  326.         catch(JCoException je)
  327.         {
  328.             je.printStackTrace();
  329.         }
  330.         
  331.     }
  332. }
End of the code.