ThreadPool процессов CLI

Мне нужно передать сообщения в CLI-процессы PHP через stdin из Java. Я хотел бы сохранить около 20 процессов PHP, запущенных в пуле, так что когда я передаю сообщение пулу, он отправляет каждое сообщение в отдельный поток, сохраняя очередь сообщений для доставки. Я бы хотел, чтобы эти PHP-процессы оставались в живых как можно дольше, создавая новый, если он умирает. Я смотрел на это со статическим пулом потоков, но он, похоже, больше предназначен для задач, которые выполняются и просто умирают. Как я могу это сделать, используя простой интерфейс для передачи сообщения в пул? Должен ли я реализовать свой собственный «пул потоков»?

Я предоставляю некоторый код с этим, поскольку я думаю, что это сделает вещи более ясными. В основном вам нужно поддерживать пул объектов процесса. Будьте внимательны, что каждый из этих процессов имеет поток ввода, вывода и ошибок, которым вам нужно управлять каким-то образом. В моем примере я просто перенаправляю ошибку и выводя на консоль основных процессов. Вы можете настроить обратные вызовы и обработчики, чтобы получить выход программы PHP, если это необходимо. Если вы просто обрабатываете задачи и не заботитесь о том, что говорит PHP, оставьте это как есть или переадресовываете файл.

Я использую библиотеку Apache Commons Pool для ObjectPool. Не нужно изобретать его.

У вас будет пул из 20 процессов, которые запускают вашу программу PHP. Только это не даст вам то, что вам нужно. Возможно, вы захотите обработать задачи по всем 20 из этих процессов «в то же время». Таким образом, вам также понадобится ThreadPool, который вытащит процесс из ObjectPool.

Вам также нужно будет понять, что если вы убьете или CTRL-C ваш процесс Java, процесс init будет обрабатывать ваши php-процессы, и они просто будут сидеть там. Вероятно, вы захотите сохранить журнал всех pid-процессов PHP-процессов, которые вы создаете, и затем очистить их, если вы повторно запустите свою Java-программу.

 public class StackOverflow_10037379 { private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName()); public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { private String mProcessToRun; public CLIPoolableObjectFactory(String processToRun) { mProcessToRun = processToRun; } @Override public Process makeObject() throws Exception { ProcessBuilder builder = new ProcessBuilder(); builder.redirectError(Redirect.INHERIT); // I am being lazy, but really the InputStream is where // you can get any output of the PHP Process. This setting // will make it output to the current processes console. builder.redirectOutput(Redirect.INHERIT); builder.redirectInput(Redirect.PIPE); builder.command(mProcessToRun); return builder.start(); } @Override public boolean validateObject(Process process) { try { process.exitValue(); return false; } catch (IllegalThreadStateException ex) { return true; } } @Override public void destroyObject(Process process) throws Exception { // If PHP has a way to stop it, do that instead of destroy process.destroy(); } @Override public void passivateObject(Process process) throws Exception { // Should really try to read from the InputStream of the Process // to prevent lock-ups if Rediret.INHERIT is not used. } } public static class CLIWorkItem implements Runnable { private ObjectPool<Process> mPool; private String mWork; public CLIWorkItem(ObjectPool<Process> pool, String work) { mPool = pool; mWork = work; } @Override public void run() { Process workProcess = null; try { workProcess = mPool.borrowObject(); OutputStream os = workProcess.getOutputStream(); os.write(mWork.getBytes(Charset.forName("UTF-8"))); os.flush(); // Because of the INHERIT rule with the output stream // the console stream overwrites itself. REMOVE THIS in production. Thread.sleep(100); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } finally { if (workProcess != null) { try { // Seriously.. so many exceptions. mPool.returnObject(workProcess); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } } } } } public static void main(String[] args) throws Exception { // Change the 5 to 20 in your case. // Also change mock_php.exe to /usr/bin/php or wherever. ObjectPool<Process> pool = new GenericObjectPool<>( new CLIPoolableObjectFactory("mock_php.exe"), 5); // This will only allow you to queue 100 work items at a time. I would suspect // that if you only want 20 PHP processes running at a time and this queue // filled up you'll need to implement some other strategy as you are doing // more work than PHP can keep up with. You'll need to block at some point // or throw work away. BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100, true); ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); // print some stuff out. executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); executor.shutdown(); executor.awaitTermination(4000, TimeUnit.HOURS); pool.close(); } } 

Результат выполнения программы:

 12172 - Message 2 10568 - Message 1 4804 - Message 3 11916 - Message 4 11116 - Message 5 12172 - Message 6 4804 - Message 7 10568 - Message 8 11916 - Message 9 11116 - Message 10 12172 - Message 11 

Код программы C ++ для вывода только что введенного:

 #include <windows.h> #include <iostream> #include <string> int main(int argc, char* argv[]) { DWORD pid = GetCurrentProcessId(); std::string line; while (true) { std::getline (std::cin, line); std::cout << pid << " - " << line << std::endl; } return 0; } 

Обновить

Извините за задержку. Вот версия JDK 6 для всех, кого это интересует. Вам нужно будет запустить отдельный поток, чтобы прочитать все входные данные из InputStream процесса. Я установил этот код, чтобы создать новый поток вдоль каждого нового процесса. Этот поток всегда читается из процесса, пока он жив. Вместо прямого вывода в файл я настроил его так, чтобы он использовал структуру ведения журнала. Таким образом, вы можете настроить конфигурацию ведения журнала, чтобы перейти к файлу, перевернуть его, перейти на консоль и т. Д., Не будучи жестко закодированным для перехода к файлу.

Вы заметите, что я запускаю только одного Gobbler для каждого процесса, даже если у процесса есть stdout и stderr. Я перенаправляю stderr на stdout, чтобы упростить работу. Очевидно, что jdk6 поддерживает только этот тип перенаправления.

 public class StackOverflow_10037379_jdk6 { private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName()); // Shamelessy taken from Google and modified. // I don't know who the original Author is. public static class StreamGobbler extends Thread { InputStream is; Logger logger; Level level; StreamGobbler(String logName, Level level, InputStream is) { this.is = is; this.logger = Logger.getLogger(logName); this.level = level; } public void run() { try { InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); String line = null; while ((line = br.readLine()) != null) { logger.log(level, line); } } catch (IOException ex) { logger.log(Level.SEVERE, "Failed to read from Process.", ex); } logger.log( Level.INFO, String.format("Exiting Gobbler for %s.", logger.getName())); } } public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { private String mProcessToRun; public CLIPoolableObjectFactory(String processToRun) { mProcessToRun = processToRun; } @Override public Process makeObject() throws Exception { ProcessBuilder builder = new ProcessBuilder(); builder.redirectErrorStream(true); builder.command(mProcessToRun); Process process = builder.start(); StreamGobbler loggingGobbler = new StreamGobbler( String.format("process.%s", process.hashCode()), Level.INFO, process.getInputStream()); loggingGobbler.start(); return process; } @Override public boolean validateObject(Process process) { try { process.exitValue(); return false; } catch (IllegalThreadStateException ex) { return true; } } @Override public void destroyObject(Process process) throws Exception { // If PHP has a way to stop it, do that instead of destroy process.destroy(); } @Override public void passivateObject(Process process) throws Exception { // Should really try to read from the InputStream of the Process // to prevent lock-ups if Rediret.INHERIT is not used. } } public static class CLIWorkItem implements Runnable { private ObjectPool<Process> mPool; private String mWork; public CLIWorkItem(ObjectPool<Process> pool, String work) { mPool = pool; mWork = work; } @Override public void run() { Process workProcess = null; try { workProcess = mPool.borrowObject(); OutputStream os = workProcess.getOutputStream(); os.write(mWork.getBytes(Charset.forName("UTF-8"))); os.flush(); // Because of the INHERIT rule with the output stream // the console stream overwrites itself. REMOVE THIS in production. Thread.sleep(100); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } finally { if (workProcess != null) { try { // Seriously.. so many exceptions. mPool.returnObject(workProcess); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } } } } } public static void main(String[] args) throws Exception { // Change the 5 to 20 in your case. ObjectPool<Process> pool = new GenericObjectPool<Process>( new CLIPoolableObjectFactory("mock_php.exe"), 5); BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true); ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); // print some stuff out. executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); executor.shutdown(); executor.awaitTermination(4000, TimeUnit.HOURS); pool.close(); } } 

Вывод

 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 9440 - Message 3 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8776 - Message 2 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 6100 - Message 1 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 10096 - Message 4 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8868 - Message 5 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8868 - Message 8 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 6100 - Message 10 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8776 - Message 9 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 10096 - Message 6 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 9440 - Message 7 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 6100 - Message 11 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.295131993. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.756434719. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.332711452. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.1981440623. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.1043636732. 

Лучше всего здесь использовать функции pcntl для вилки процесса, но общение между процессами затруднено. Я бы рекомендовал создать очередь, из которой могут обрабатывать ваши процессы, а не пытаться передать сообщения в командную строку.

В Beanstalk есть несколько PHP-клиентов, которые можно использовать для обработки обмена сообщениями между процессами.