Browse Source

* lib-scheduler-engines/src/main/java/com/fourelementscapital/scheduler/engines/PythonScript.java

* lib-scheduler-engines/src/main/java/com/fourelementscapital/scheduler/group/PythonScriptTask.java
      Added new: the new python engine based on cpython

  * lib-scheduler-queue/src/main/java/com/fourelementscapital/scheduler/peer/QueueFactory.java
      Added: the new factory for cpython

  * lib-scheduler-queue/src/main/java/com/fourelementscapital/scheduler/ScheduledTaskFactory.java
      Adjusted: accordingly to hook up the new engine
            Associated SQL statements needed in bbsync database:

              update scheduler_group set enginetype = 'pscript4pythonengine'
                where taskuid = 'rscript4rserveunix_python';

              insert into scheduler_taskpeers( taskuid, peername )
                values( 'rscript4rserveunix_python', '4ecapsvid13' );
master^2
Bernarto P Tjahjono 3 years ago
parent
commit
adf29ba998
  1. 117
      scheduler_libraries/lib-scheduler-engines/src/main/java/com/fourelementscapital/scheduler/engines/PythonScript.java
  2. 77
      scheduler_libraries/lib-scheduler-engines/src/main/java/com/fourelementscapital/scheduler/group/PythonScriptTask.java
  3. 9
      scheduler_libraries/lib-scheduler-queue/src/main/java/com/fourelementscapital/scheduler/ScheduledTaskFactory.java
  4. 53
      scheduler_libraries/lib-scheduler-queue/src/main/java/com/fourelementscapital/scheduler/peer/QueueFactory.java

117
scheduler_libraries/lib-scheduler-engines/src/main/java/com/fourelementscapital/scheduler/engines/PythonScript.java

@ -0,0 +1,117 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.fourelementscapital.scheduler.engines;
import org.quartz.JobExecutionException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import com.fourelementscapital.scheduler.exception.SchedulerException;
/**
*
* @author bernarto
*/
public abstract class PythonScript extends ScheduledTask {
public PythonScript(String name, String uid) {
super(name, uid);
}
@Override
public void execute(StackFrame sframe) throws JobExecutionException,
SchedulerException,
Exception {
if (sframe == null || sframe.getData() == null ||
sframe.getData().get("rscript") == null ||
sframe.getData().get("id") == null) {
return;
}
String script = (String)sframe.getData().get("rscript");
Number nid = (Number)sframe.getData().get("id");
File py = File.createTempFile( nid.intValue() + "-" +
sframe.getTrigger_time(), ".py");
try ( FileWriter writer = new FileWriter(py) ) {
writer.write( script );
}
StringBuilder sb = new StringBuilder(1024);
Process process = runPythonScript( py.getName() );
int exitcode = 0;
try ( InputStreamReader isr =
new InputStreamReader(process.getInputStream()) ) {
try ( BufferedReader reader = new BufferedReader(isr) ) {
String line = reader.readLine();
while (line != null) {
sb.append(line).append("\n");
line = reader.readLine();
}
}
}
finally {
sframe.setConsole_message( sb.toString() );
exitcode = process.waitFor();
sframe.setTasklog(null);
py.delete();
}
if (exitcode != 0) {
throw new SyntaxError("Syntax errors in python script");
}
}
private Process runPythonScript(String scriptfile) throws IOException {
synchronized (this) {
getBuilder().command("python", scriptfile);
return getBuilder().start();
}
}
private ProcessBuilder getBuilder() {
ProcessBuilder local = null;
if ((local = _builder) == null) {
synchronized (this) {
if ((local = _builder) == null) {
_builder = new ProcessBuilder();
_builder.redirectErrorStream( true );
// _builder.environment().put("PYTHONPATH", "");
// _builder.environment().put("PYTHONHOME", "");
String tmpdir = System.getProperty("java.io.tmpdir");
if (tmpdir != null && tmpdir.length() > 0) {
_builder.directory( new File(tmpdir) );
}
}
}
}
return _builder;
}
private volatile ProcessBuilder _builder;
public class SyntaxError extends SchedulerException {
public SyntaxError(String msg) {
super(msg);
}
public int getErrorcode() {
return ERROR_SERVER_GENERAL_SCRIPT_ERROR;
}
}
}

77
scheduler_libraries/lib-scheduler-engines/src/main/java/com/fourelementscapital/scheduler/group/PythonScriptTask.java

@ -0,0 +1,77 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.fourelementscapital.scheduler.group;
import com.fourelementscapital.db.vo.FlexiField;
import com.fourelementscapital.scheduler.engines.PythonScript;
import com.fourelementscapital.scheduler.engines.ScheduledTaskField;
import java.util.Vector;
/**
*
* @author bernarto
*/
public class PythonScriptTask extends PythonScript {
public static final String ENGINE_NAME = "pscript4pythonengine";
public PythonScriptTask(String name, String uid) {
super (name, uid);
try {
addFormFields( getMyFields() );
}
catch ( Exception e ) {
e.printStackTrace();
}
}
protected Vector<ScheduledTaskField> getMyFields(){
Vector<ScheduledTaskField> fields = new Vector<>();
//list
ScheduledTaskField f3 = new ScheduledTaskField();
f3.setShortname( "rscript_param" );
f3.setFieldlabel("");
f3.setFieldtype( FlexiField.TYPE_RSCRIPTEDITOR_PARAM );
f3.setPlacementform( "codeinject" );
fields.add( f3 );
ScheduledTaskField f4 = new ScheduledTaskField();
f4.setShortname( "rscript" );
f4.setFieldlabel( "Python Script" );
f4.setFieldtype( FlexiField.TYPE_RSCRIPTEDITOR );
fields.add(f4);
fields.addAll( getAdditionalRScriptField() );
return fields;
}
public Vector<ScheduledTaskField> getAdditionalRScriptField(){
Vector<ScheduledTaskField> fields = new Vector<>();
ScheduledTaskField f5=new ScheduledTaskField();
f5.setShortname( "trigger_commodity" );
f5.setFieldlabel( "Trigger Commodity" );
f5.setFieldtype( FlexiField.TYPE_TEXTBOX );
f5.setFineprint( "Building block which fixing time defines the "
+ "trigger time for Zero-day lag indicators e.g: CL,C,JNso");
f5.setPlacementform( "buildingblock" );
fields.add( f5 );
ScheduledTaskField f6 = new ScheduledTaskField();
f6.setShortname( "trigger_frequency" );
f6.setFieldlabel( "Trigger Frequency" );
f6.setFieldtype( FlexiField.TYPE_TEXTBOX );
f6.setFineprint( "Specific Intervals: 1/3 (starting at 1 and every 3 minutes)" );
f6.setPlacementform( "buildingblock" );
fields.add( f6 );
return fields;
}
}

9
scheduler_libraries/lib-scheduler-queue/src/main/java/com/fourelementscapital/scheduler/ScheduledTaskFactory.java

@ -20,6 +20,7 @@ import com.fourelementscapital.db.SchedulerDB;
import com.fourelementscapital.scheduler.engines.ScheduledTask;
import com.fourelementscapital.scheduler.group.BBDownloadScheduledTask;
import com.fourelementscapital.scheduler.group.DirectRServeExecuteRUnix;
import com.fourelementscapital.scheduler.group.PythonScriptTask;
import com.fourelementscapital.scheduler.group.REngineScriptTask;
import com.fourelementscapital.scheduler.group.RScriptScheduledTask;
import com.fourelementscapital.scheduler.group.RServeScheduledTask;
@ -38,7 +39,8 @@ public class ScheduledTaskFactory {
init();
}
public synchronized void refreshTaskLoaded(){
//public synchronized void refreshTaskLoaded(){
public void refreshTaskLoaded(){
synchronized(scheduledTasks){
if(allTasks.size()>0 || scheduledTasks.size()>0){
allTasks.clear();
@ -72,9 +74,10 @@ public class ScheduledTaskFactory {
if(enginetype.equalsIgnoreCase("rscript")){st=new RScriptScheduledTask(name,taskuid); }
if(enginetype.equalsIgnoreCase("rscript4rserve")){st=new RServeScheduledTask(name,taskuid); }
if(enginetype.equalsIgnoreCase("bb_download")){st=new BBDownloadScheduledTask(name,taskuid); }
if(enginetype.equalsIgnoreCase(RServeUnixTask.ENGINE_NAME)){st=new RServeUnixTask(name,taskuid); }
if(enginetype.equalsIgnoreCase(RServeUnixTask.ENGINE_NAME)){st=new RServeUnixTask(name,taskuid); } //rscript4rserveunix
if(enginetype.equalsIgnoreCase("direct_script")){st=new REngineScriptTask(name,taskuid); }
if(enginetype.equalsIgnoreCase("direct_script_unix")){st=new DirectRServeExecuteRUnix(name,taskuid);}
if(enginetype.equalsIgnoreCase(PythonScriptTask.ENGINE_NAME)){st=new PythonScriptTask(name,taskuid);}
//if(enginetype.equalsIgnoreCase("rscript4rconsole")){st=new RConsoleScript(name,taskuid);}
if(st!=null){
@ -83,7 +86,7 @@ public class ScheduledTaskFactory {
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
sdb.closeDB();

53
scheduler_libraries/lib-scheduler-queue/src/main/java/com/fourelementscapital/scheduler/peer/QueueFactory.java

@ -15,6 +15,9 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.Vector;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -25,6 +28,7 @@ import com.fourelementscapital.scheduler.engines.ScheduledTask;
import com.fourelementscapital.scheduler.engines.StackFrame;
import com.fourelementscapital.scheduler.p2p.P2PService;
import com.fourelementscapital.scheduler.p2p.peer.PeerSpecificConfigurations;
import com.fourelementscapital.scheduler.exception.ExceptionPeerRejected;
public class QueueFactory {
@ -49,10 +53,12 @@ public class QueueFactory {
public QueueAbstract getQueue(String taskuid){
QueueAbstract queueAbs = (QueueAbstract)queueHandlers.get(taskuid);
return (QueueAbstract)queueHandlers.get(taskuid);
}
public TreeMap getQueue(){
return queueHandlers;
}
@ -144,17 +150,16 @@ public class QueueFactory {
}
private void initQueue(){
SchedulerDB sdb=SchedulerDB.getSchedulerDB();
try{
sdb.connectDB();
sdb.removeAllPeerThreadStatus(P2PService.getComputerName());
}catch(Exception e){ }finally{
try{sdb.closeDB();}catch(Exception e1){}
}
//over-rides
QueueAbstract rserv=new QueueAbstract("rserv"){
QueueAbstract rserv=new QueueAbstract("rserv"){ /////////////////////////////////////////////////////////////////////////
public Vector getTaskUids(){
Vector v=new Vector();
@ -203,7 +208,6 @@ public class QueueFactory {
};
add2Qhandler(rserv);
//over-rides
QueueAbstract rservunix=new QueueAbstract("rservunix"){
public Vector getTaskUids(){
@ -246,6 +250,43 @@ public class QueueFactory {
};
add2Qhandler(rservunix);
//over-rides
QueueAbstract cpythonlinux = new QueueAbstract("cpythonlinux") {
@Override
public Vector getTaskUids() {
Vector result = new Vector();
SchedulerDB sdb = SchedulerDB.getSchedulerDB();
try {
sdb.connectDB();
Vector< Map<String, String> > grp = sdb.getGroups( "pscript4pythonengine" );
for (Map<String, String> data : grp) {
result.add( data.get( "taskuid" ) );
}
}
catch (Exception e) {
}
finally {
try {
sdb.closeDB();
}
catch (Exception e1) {
}
}
return result;
}
@Override
public int getConcurrentThreads() {
int result = 15; // For now
return result;
}
};
add2Qhandler(cpythonlinux);
//over-rides
QueueAbstract rscript=new QueueAbstract("rscript"){
public Vector getTaskUids(){
@ -278,7 +319,6 @@ public class QueueFactory {
return 1;
}
};
add2Qhandler(rscript);
QueueAbstract othercripts=new QueueAbstract("othercripts"){
@ -297,12 +337,13 @@ public class QueueFactory {
return 1;
}
};
add2Qhandler(othercripts);
add2Qhandler(othercripts);
}
private void add2Qhandler(QueueAbstract qa){
for(Iterator i=qa.getTaskUids().iterator();i.hasNext();){
queueHandlers.put(i.next(), qa);
}

Loading…
Cancel
Save