Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
      final boolean useThreads = GrouperLoaderConfig.retrieveConfig().propertyValueBoolean("loader.use.groupThreads", true);

      //see when threads are done processing
      List<GrouperFuture> futures = new ArrayList<GrouperFuture>();

      //if there were thread problems, run those again
      List<GrouperCallable> callablesWithProblems = new ArrayList<GrouperCallable>();

      int groupThreadPoolSize = GrouperLoaderConfig.retrieveConfig().propertyValueInt("loader.groupThreadPoolSize", 20);

      for (final String groupName : groupNamesToSync) {
        
        if (LOG.isDebugEnabled()) {
          LOG.debug(groupNameOverall + ": syncing membership for " + groupName + " " + count + " out of " + groupNamesToSync.size() + " groups");
        }
        
        GrouperCallable<Void> grouperCallable = new GrouperCallable<Void>("syncLogicForOneGroup") {

          @Override
          public Void callLogic() {
            syncGroupLogicForOneGroup(grouperLoaderResultsetOverall,
                GrouperSession.staticGrouperSession(), andGroups, groupTypes, hib3GrouploaderLogOverall,
                statusOverall, groupNameToDisplayName, groupNameToDescription, privsToAdd,
                groupStartedMillis, membershipsInRegistry, groupName);
            return null;
          }
        };

        if (!useThreads) {
          grouperCallable.callLogic();
        } else {
          GrouperFuture<Void> future = GrouperUtil.executorServiceSubmit(GrouperUtil.retrieveExecutorService(), grouperCallable);
          futures.add(future);
          
          // if there are more threads running that the max, then wait for some to finish until below limit
          GrouperFuture.waitForJob(futures, groupThreadPoolSize, callablesWithProblems);

        }

        count++;
      }

      // wait for the rest
      GrouperFuture.waitForJob(futures, 0, callablesWithProblems);


      // if there was a problem in any threads, try them again
      GrouperCallable.tryCallablesWithProblems(callablesWithProblems);
      

Retrying errors

In the above code, if an error is found, it will be tried again not in a thread.  A warning will be logged for each thread that has a problem.  If no other errors occur, then everything is fine and ignore it.  Here is an example of the warning:

Code Block

2014-11-07 08:02:32,590: [main] WARN  GrouperFuture.waitForJob(155) -  - Non fatal problem with callable.  Will try again not in thread
java.lang.RuntimeException: testing,
Problem in job: syncLogicForOneGroup: loader:group1_systemOfRecord
 at edu.internet2.middleware.grouper.app.loader.GrouperLoaderType$10.callLogic(GrouperLoaderType.java:1387)
 at edu.internet2.middleware.grouper.app.loader.GrouperLoaderType$10.callLogic(GrouperLoaderType.java:1)
 at edu.internet2.middleware.grouper.util.GrouperCallable$1.callback(GrouperCallable.java:151)
 at edu.internet2.middleware.grouper.GrouperSession.callbackGrouperSession(GrouperSession.java:974)
 at edu.internet2.middleware.grouper.util.GrouperCallable.callLogicWithSessionIfExists(GrouperCallable.java:148)
 at edu.internet2.middleware.grouper.util.GrouperCallable.call(GrouperCallable.java:121)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
 at java.lang.Thread.run(Thread.java:662)

Transactional problems

There are transactional problems with multi-threading.  For example in the loader, it tries to process multiple groups at once.  And if the parent stems dont exist for those groups, each thread will try to create those.  And if they are the same stem, there are issues.  This should be handled at the API layer so that the API can handle multithreaded operations in other areas.  Not only does this need to be synchronized, but there needs to be some flags so that threads know that another tx is working on creating the resource, and then it should wait a bit for thaat transaction to finish.  This is currently implemented for stem creation and member creation.

...