multiprocess.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. 'use strict';
  2. const debug = require('debug')('log4js:multiprocess');
  3. const net = require('net');
  4. const LoggingEvent = require('../LoggingEvent');
  5. const END_MSG = '__LOG4JS__';
  6. /**
  7. * Creates a server, listening on config.loggerPort, config.loggerHost.
  8. * Output goes to config.actualAppender (config.appender is used to
  9. * set up that appender).
  10. */
  11. function logServer(config, actualAppender, levels) {
  12. /**
  13. * Takes a utf-8 string, returns an object with
  14. * the correct log properties.
  15. */
  16. function deserializeLoggingEvent(clientSocket, msg) {
  17. debug('deserialising log event');
  18. const loggingEvent = LoggingEvent.deserialise(msg);
  19. loggingEvent.remoteAddress = clientSocket.remoteAddress;
  20. loggingEvent.remotePort = clientSocket.remotePort;
  21. return loggingEvent;
  22. }
  23. /* eslint prefer-arrow-callback:0 */
  24. const server = net.createServer(function serverCreated(clientSocket) {
  25. clientSocket.setEncoding('utf8');
  26. let logMessage = '';
  27. function logTheMessage(msg) {
  28. if (logMessage.length > 0) {
  29. debug('deserialising log event and sending to actual appender');
  30. actualAppender(deserializeLoggingEvent(clientSocket, msg));
  31. }
  32. }
  33. function chunkReceived(chunk) {
  34. debug('chunk of data received');
  35. let event;
  36. logMessage += chunk || '';
  37. if (logMessage.indexOf(END_MSG) > -1) {
  38. event = logMessage.substring(0, logMessage.indexOf(END_MSG));
  39. logTheMessage(event);
  40. logMessage = logMessage.substring(event.length + END_MSG.length) || '';
  41. // check for more, maybe it was a big chunk
  42. chunkReceived();
  43. }
  44. }
  45. function handleError(error) {
  46. const loggingEvent = {
  47. startTime: new Date(),
  48. categoryName: 'log4js',
  49. level: levels.ERROR,
  50. data: ['A worker log process hung up unexpectedly', error],
  51. remoteAddress: clientSocket.remoteAddress,
  52. remotePort: clientSocket.remotePort
  53. };
  54. actualAppender(loggingEvent);
  55. }
  56. clientSocket.on('data', chunkReceived);
  57. clientSocket.on('end', chunkReceived);
  58. clientSocket.on('error', handleError);
  59. });
  60. server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function () {
  61. debug('master server listening');
  62. // allow the process to exit, if this is the only socket active
  63. server.unref();
  64. });
  65. function app(event) {
  66. debug('log event sent directly to actual appender (local event)');
  67. return actualAppender(event);
  68. }
  69. app.shutdown = function (cb) {
  70. debug('master shutdown called, closing server');
  71. server.close(cb);
  72. };
  73. return app;
  74. }
  75. function workerAppender(config) {
  76. let canWrite = false;
  77. const buffer = [];
  78. let socket;
  79. let shutdownAttempts = 3;
  80. function write(loggingEvent) {
  81. debug('Writing log event to socket');
  82. socket.write(loggingEvent.serialise(), 'utf8');
  83. socket.write(END_MSG, 'utf8');
  84. }
  85. function emptyBuffer() {
  86. let evt;
  87. debug('emptying worker buffer');
  88. /* eslint no-cond-assign:0 */
  89. while ((evt = buffer.shift())) {
  90. write(evt);
  91. }
  92. }
  93. function createSocket() {
  94. debug(`worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`);
  95. socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
  96. socket.on('connect', () => {
  97. debug('worker socket connected');
  98. emptyBuffer();
  99. canWrite = true;
  100. });
  101. socket.on('timeout', socket.end.bind(socket));
  102. // don't bother listening for 'error', 'close' gets called after that anyway
  103. socket.on('close', createSocket);
  104. }
  105. createSocket();
  106. function log(loggingEvent) {
  107. if (canWrite) {
  108. write(loggingEvent);
  109. } else {
  110. debug('worker buffering log event because it cannot write at the moment');
  111. buffer.push(loggingEvent);
  112. }
  113. }
  114. log.shutdown = function (cb) {
  115. debug('worker shutdown called');
  116. if (buffer.length && shutdownAttempts) {
  117. debug('worker buffer has items, waiting 100ms to empty');
  118. shutdownAttempts -= 1;
  119. setTimeout(() => {
  120. log.shutdown(cb);
  121. }, 100);
  122. } else {
  123. socket.removeAllListeners('close');
  124. socket.end(cb);
  125. }
  126. };
  127. return log;
  128. }
  129. function createAppender(config, appender, levels) {
  130. if (config.mode === 'master') {
  131. debug('Creating master appender');
  132. return logServer(config, appender, levels);
  133. }
  134. debug('Creating worker appender');
  135. return workerAppender(config);
  136. }
  137. function configure(config, layouts, findAppender, levels) {
  138. let appender;
  139. debug(`configure with mode = ${config.mode}`);
  140. if (config.mode === 'master') {
  141. if (!config.appender) {
  142. debug(`no appender found in config ${config}`);
  143. throw new Error('multiprocess master must have an "appender" defined');
  144. }
  145. debug(`actual appender is ${config.appender}`);
  146. appender = findAppender(config.appender);
  147. if (!appender) {
  148. debug(`actual appender "${config.appender}" not found`);
  149. throw new Error(`multiprocess master appender "${config.appender}" not defined`);
  150. }
  151. }
  152. return createAppender(config, appender, levels);
  153. }
  154. module.exports.configure = configure;