...
Code Block |
---|
final boolean useThreads = GrouperLoaderConfig.retrieveConfig().propertyValueBoolean("loader.use.groupThreads", true); //see when threads are done processing List<GrouperFuture> futures = new ArrayList<GrouperFuture>(); //if there were thread problems, run those again List<GrouperCallable> callablesWithProblems = new ArrayList<GrouperCallable>(); int groupThreadPoolSize = GrouperLoaderConfig.retrieveConfig().propertyValueInt("loader.groupThreadPoolSize", 20); for (final String groupName : groupNamesToSync) { if (LOG.isDebugEnabled()) { LOG.debug(groupNameOverall + ": syncing membership for " + groupName + " " + count + " out of " + groupNamesToSync.size() + " groups"); } GrouperCallable<Void> grouperCallable = new GrouperCallable<Void>("syncLogicForOneGroup") { @Override public Void callLogic() { syncGroupLogicForOneGroup(grouperLoaderResultsetOverall, GrouperSession.staticGrouperSession(), andGroups, groupTypes, hib3GrouploaderLogOverall, statusOverall, groupNameToDisplayName, groupNameToDescription, privsToAdd, groupStartedMillis, membershipsInRegistry, groupName); return null; } }; if (!useThreads) { grouperCallable.callLogic(); } else { GrouperFuture<Void> future = GrouperUtil.executorServiceSubmit(GrouperUtil.retrieveExecutorService(), grouperCallable); futures.add(future); // if there are more threads running that the max, then wait for some to finish until below limit GrouperFuture.waitForJob(futures, groupThreadPoolSize, callablesWithProblems); } count++; } // wait for the rest GrouperFuture.waitForJob(futures, 0, callablesWithProblems); // if there was a problem in any threads, try them again GrouperCallable.tryCallablesWithProblems(callablesWithProblems); |
Retrying errors
In the above code, if an error is found, it will be tried again not in a thread. A warning will be logged for each thread that has a problem. If no other errors occur, then everything is fine and ignore it. Here is an example of the warning:
Code Block |
---|
2014-11-07 08:02:32,590: [main] WARN GrouperFuture.waitForJob(155) - - Non fatal problem with callable. Will try again not in thread
java.lang.RuntimeException: testing,
Problem in job: syncLogicForOneGroup: loader:group1_systemOfRecord
at edu.internet2.middleware.grouper.app.loader.GrouperLoaderType$10.callLogic(GrouperLoaderType.java:1387)
at edu.internet2.middleware.grouper.app.loader.GrouperLoaderType$10.callLogic(GrouperLoaderType.java:1)
at edu.internet2.middleware.grouper.util.GrouperCallable$1.callback(GrouperCallable.java:151)
at edu.internet2.middleware.grouper.GrouperSession.callbackGrouperSession(GrouperSession.java:974)
at edu.internet2.middleware.grouper.util.GrouperCallable.callLogicWithSessionIfExists(GrouperCallable.java:148)
at edu.internet2.middleware.grouper.util.GrouperCallable.call(GrouperCallable.java:121)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
|
Transactional problems
There are transactional problems with multi-threading. For example in the loader, it tries to process multiple groups at once. And if the parent stems dont exist for those groups, each thread will try to create those. And if they are the same stem, there are issues. This should be handled at the API layer so that the API can handle multithreaded operations in other areas. Not only does this need to be synchronized, but there needs to be some flags so that threads know that another tx is working on creating the resource, and then it should wait a bit for thaat transaction to finish. This is currently implemented for stem creation and member creation.
...