migrating to the latest JUCE version

This commit is contained in:
2022-11-04 23:11:33 +01:00
committed by Nikolai Rodionov
parent 4257a0f8ba
commit faf8f18333
2796 changed files with 888518 additions and 784244 deletions

View File

@ -1,267 +1,297 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
enum { magicMastSlaveConnectionHeader = 0x712baf04 };
static const char* startMessage = "__ipc_st";
static const char* killMessage = "__ipc_k_";
static const char* pingMessage = "__ipc_p_";
enum { specialMessageSize = 8, defaultTimeoutMs = 8000 };
static bool isMessageType (const MemoryBlock& mb, const char* messageType) noexcept
{
return mb.matches (messageType, (size_t) specialMessageSize);
}
static String getCommandLinePrefix (const String& commandLineUniqueID)
{
return "--" + commandLineUniqueID + ":";
}
//==============================================================================
// This thread sends and receives ping messages every second, so that it
// can find out if the other process has stopped running.
struct ChildProcessPingThread : public Thread,
private AsyncUpdater
{
ChildProcessPingThread (int timeout) : Thread ("IPC ping"), timeoutMs (timeout)
{
pingReceived();
}
void pingReceived() noexcept { countdown = timeoutMs / 1000 + 1; }
void triggerConnectionLostMessage() { triggerAsyncUpdate(); }
virtual bool sendPingMessage (const MemoryBlock&) = 0;
virtual void pingFailed() = 0;
int timeoutMs;
private:
Atomic<int> countdown;
void handleAsyncUpdate() override { pingFailed(); }
void run() override
{
while (! threadShouldExit())
{
if (--countdown <= 0 || ! sendPingMessage ({ pingMessage, specialMessageSize }))
{
triggerConnectionLostMessage();
break;
}
wait (1000);
}
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessPingThread)
};
//==============================================================================
struct ChildProcessMaster::Connection : public InterprocessConnection,
private ChildProcessPingThread
{
Connection (ChildProcessMaster& m, const String& pipeName, int timeout)
: InterprocessConnection (false, magicMastSlaveConnectionHeader),
ChildProcessPingThread (timeout),
owner (m)
{
if (createPipe (pipeName, timeoutMs))
startThread (4);
}
~Connection() override
{
stopThread (10000);
}
private:
void connectionMade() override {}
void connectionLost() override { owner.handleConnectionLost(); }
bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToSlave (m); }
void pingFailed() override { connectionLost(); }
void messageReceived (const MemoryBlock& m) override
{
pingReceived();
if (m.getSize() != specialMessageSize || ! isMessageType (m, pingMessage))
owner.handleMessageFromSlave (m);
}
ChildProcessMaster& owner;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
};
//==============================================================================
ChildProcessMaster::ChildProcessMaster() {}
ChildProcessMaster::~ChildProcessMaster()
{
killSlaveProcess();
}
void ChildProcessMaster::handleConnectionLost() {}
bool ChildProcessMaster::sendMessageToSlave (const MemoryBlock& mb)
{
if (connection != nullptr)
return connection->sendMessage (mb);
jassertfalse; // this can only be used when the connection is active!
return false;
}
bool ChildProcessMaster::launchSlaveProcess (const File& executable, const String& commandLineUniqueID,
int timeoutMs, int streamFlags)
{
killSlaveProcess();
auto pipeName = "p" + String::toHexString (Random().nextInt64());
StringArray args;
args.add (executable.getFullPathName());
args.add (getCommandLinePrefix (commandLineUniqueID) + pipeName);
childProcess.reset (new ChildProcess());
if (childProcess->start (args, streamFlags))
{
connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
if (connection->isConnected())
{
sendMessageToSlave ({ startMessage, specialMessageSize });
return true;
}
connection.reset();
}
return false;
}
void ChildProcessMaster::killSlaveProcess()
{
if (connection != nullptr)
{
sendMessageToSlave ({ killMessage, specialMessageSize });
connection->disconnect();
connection.reset();
}
childProcess.reset();
}
//==============================================================================
struct ChildProcessSlave::Connection : public InterprocessConnection,
private ChildProcessPingThread
{
Connection (ChildProcessSlave& p, const String& pipeName, int timeout)
: InterprocessConnection (false, magicMastSlaveConnectionHeader),
ChildProcessPingThread (timeout),
owner (p)
{
connectToPipe (pipeName, timeoutMs);
startThread (4);
}
~Connection() override
{
stopThread (10000);
}
private:
ChildProcessSlave& owner;
void connectionMade() override {}
void connectionLost() override { owner.handleConnectionLost(); }
bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToMaster (m); }
void pingFailed() override { connectionLost(); }
void messageReceived (const MemoryBlock& m) override
{
pingReceived();
if (isMessageType (m, pingMessage))
return;
if (isMessageType (m, killMessage))
return triggerConnectionLostMessage();
if (isMessageType (m, startMessage))
return owner.handleConnectionMade();
owner.handleMessageFromMaster (m);
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
};
//==============================================================================
ChildProcessSlave::ChildProcessSlave() {}
ChildProcessSlave::~ChildProcessSlave() {}
void ChildProcessSlave::handleConnectionMade() {}
void ChildProcessSlave::handleConnectionLost() {}
bool ChildProcessSlave::sendMessageToMaster (const MemoryBlock& mb)
{
if (connection != nullptr)
return connection->sendMessage (mb);
jassertfalse; // this can only be used when the connection is active!
return false;
}
bool ChildProcessSlave::initialiseFromCommandLine (const String& commandLine,
const String& commandLineUniqueID,
int timeoutMs)
{
auto prefix = getCommandLinePrefix (commandLineUniqueID);
if (commandLine.trim().startsWith (prefix))
{
auto pipeName = commandLine.fromFirstOccurrenceOf (prefix, false, false)
.upToFirstOccurrenceOf (" ", false, false).trim();
if (pipeName.isNotEmpty())
{
connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
if (! connection->isConnected())
connection.reset();
}
}
return connection != nullptr;
}
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
enum { magicCoordWorkerConnectionHeader = 0x712baf04 };
static const char* startMessage = "__ipc_st";
static const char* killMessage = "__ipc_k_";
static const char* pingMessage = "__ipc_p_";
enum { specialMessageSize = 8, defaultTimeoutMs = 8000 };
static bool isMessageType (const MemoryBlock& mb, const char* messageType) noexcept
{
return mb.matches (messageType, (size_t) specialMessageSize);
}
static String getCommandLinePrefix (const String& commandLineUniqueID)
{
return "--" + commandLineUniqueID + ":";
}
//==============================================================================
// This thread sends and receives ping messages every second, so that it
// can find out if the other process has stopped running.
struct ChildProcessPingThread : public Thread,
private AsyncUpdater
{
ChildProcessPingThread (int timeout) : Thread ("IPC ping"), timeoutMs (timeout)
{
pingReceived();
}
void startPinging() { startThread (4); }
void pingReceived() noexcept { countdown = timeoutMs / 1000 + 1; }
void triggerConnectionLostMessage() { triggerAsyncUpdate(); }
virtual bool sendPingMessage (const MemoryBlock&) = 0;
virtual void pingFailed() = 0;
int timeoutMs;
using AsyncUpdater::cancelPendingUpdate;
private:
Atomic<int> countdown;
void handleAsyncUpdate() override { pingFailed(); }
void run() override
{
while (! threadShouldExit())
{
if (--countdown <= 0 || ! sendPingMessage ({ pingMessage, specialMessageSize }))
{
triggerConnectionLostMessage();
break;
}
wait (1000);
}
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessPingThread)
};
//==============================================================================
struct ChildProcessCoordinator::Connection : public InterprocessConnection,
private ChildProcessPingThread
{
Connection (ChildProcessCoordinator& m, const String& pipeName, int timeout)
: InterprocessConnection (false, magicCoordWorkerConnectionHeader),
ChildProcessPingThread (timeout),
owner (m)
{
createPipe (pipeName, timeoutMs);
}
~Connection() override
{
cancelPendingUpdate();
stopThread (10000);
}
using ChildProcessPingThread::startPinging;
private:
void connectionMade() override {}
void connectionLost() override { owner.handleConnectionLost(); }
bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToWorker (m); }
void pingFailed() override { connectionLost(); }
void messageReceived (const MemoryBlock& m) override
{
pingReceived();
if (m.getSize() != specialMessageSize || ! isMessageType (m, pingMessage))
owner.handleMessageFromWorker (m);
}
ChildProcessCoordinator& owner;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
};
//==============================================================================
ChildProcessCoordinator::ChildProcessCoordinator() = default;
ChildProcessCoordinator::~ChildProcessCoordinator()
{
killWorkerProcess();
}
void ChildProcessCoordinator::handleConnectionLost() {}
void ChildProcessCoordinator::handleMessageFromWorker (const MemoryBlock& mb)
{
JUCE_BEGIN_IGNORE_WARNINGS_GCC_LIKE ("-Wdeprecated-declarations")
JUCE_BEGIN_IGNORE_WARNINGS_MSVC (4996)
handleMessageFromSlave (mb);
JUCE_END_IGNORE_WARNINGS_GCC_LIKE
JUCE_END_IGNORE_WARNINGS_MSVC
}
bool ChildProcessCoordinator::sendMessageToWorker (const MemoryBlock& mb)
{
if (connection != nullptr)
return connection->sendMessage (mb);
jassertfalse; // this can only be used when the connection is active!
return false;
}
bool ChildProcessCoordinator::launchWorkerProcess (const File& executable, const String& commandLineUniqueID,
int timeoutMs, int streamFlags)
{
killWorkerProcess();
auto pipeName = "p" + String::toHexString (Random().nextInt64());
StringArray args;
args.add (executable.getFullPathName());
args.add (getCommandLinePrefix (commandLineUniqueID) + pipeName);
childProcess.reset (new ChildProcess());
if (childProcess->start (args, streamFlags))
{
connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
if (connection->isConnected())
{
connection->startPinging();
sendMessageToWorker ({ startMessage, specialMessageSize });
return true;
}
connection.reset();
}
return false;
}
void ChildProcessCoordinator::killWorkerProcess()
{
if (connection != nullptr)
{
sendMessageToWorker ({ killMessage, specialMessageSize });
connection->disconnect();
connection.reset();
}
childProcess.reset();
}
//==============================================================================
struct ChildProcessWorker::Connection : public InterprocessConnection,
private ChildProcessPingThread
{
Connection (ChildProcessWorker& p, const String& pipeName, int timeout)
: InterprocessConnection (false, magicCoordWorkerConnectionHeader),
ChildProcessPingThread (timeout),
owner (p)
{
connectToPipe (pipeName, timeoutMs);
}
~Connection() override
{
cancelPendingUpdate();
stopThread (10000);
disconnect();
}
using ChildProcessPingThread::startPinging;
private:
ChildProcessWorker& owner;
void connectionMade() override {}
void connectionLost() override { owner.handleConnectionLost(); }
bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToCoordinator (m); }
void pingFailed() override { connectionLost(); }
void messageReceived (const MemoryBlock& m) override
{
pingReceived();
if (isMessageType (m, pingMessage))
return;
if (isMessageType (m, killMessage))
return triggerConnectionLostMessage();
if (isMessageType (m, startMessage))
return owner.handleConnectionMade();
owner.handleMessageFromCoordinator (m);
}
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
};
//==============================================================================
ChildProcessWorker::ChildProcessWorker() = default;
ChildProcessWorker::~ChildProcessWorker() = default;
void ChildProcessWorker::handleConnectionMade() {}
void ChildProcessWorker::handleConnectionLost() {}
void ChildProcessWorker::handleMessageFromCoordinator (const MemoryBlock& mb)
{
JUCE_BEGIN_IGNORE_WARNINGS_GCC_LIKE ("-Wdeprecated-declarations")
JUCE_BEGIN_IGNORE_WARNINGS_MSVC (4996)
handleMessageFromMaster (mb);
JUCE_END_IGNORE_WARNINGS_GCC_LIKE
JUCE_END_IGNORE_WARNINGS_MSVC
}
bool ChildProcessWorker::sendMessageToCoordinator (const MemoryBlock& mb)
{
if (connection != nullptr)
return connection->sendMessage (mb);
jassertfalse; // this can only be used when the connection is active!
return false;
}
bool ChildProcessWorker::initialiseFromCommandLine (const String& commandLine,
const String& commandLineUniqueID,
int timeoutMs)
{
auto prefix = getCommandLinePrefix (commandLineUniqueID);
if (commandLine.trim().startsWith (prefix))
{
auto pipeName = commandLine.fromFirstOccurrenceOf (prefix, false, false)
.upToFirstOccurrenceOf (" ", false, false).trim();
if (pipeName.isNotEmpty())
{
connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
if (connection->isConnected())
connection->startPinging();
else
connection.reset();
}
}
return connection != nullptr;
}
} // namespace juce

View File

@ -1,200 +1,228 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
//==============================================================================
/**
Acts as the slave end of a master/slave pair of connected processes.
The ChildProcessSlave and ChildProcessMaster classes make it easy for an app
to spawn a child process, and to manage a 2-way messaging connection to control it.
To use the system, you need to create subclasses of both ChildProcessSlave and
ChildProcessMaster. To instantiate the ChildProcessSlave object, you must
add some code to your main() or JUCEApplication::initialise() function that
calls the initialiseFromCommandLine() method to check the app's command-line
parameters to see whether it's being launched as a child process. If this returns
true then the slave process can be allowed to run, and its handleMessageFromMaster()
method will be called whenever a message arrives.
The juce demo app has a good example of this class in action.
@see ChildProcessMaster, InterprocessConnection, ChildProcess
@tags{Events}
*/
class JUCE_API ChildProcessSlave
{
public:
/** Creates a non-connected slave process.
Use initialiseFromCommandLine to connect to a master process.
*/
ChildProcessSlave();
/** Destructor. */
virtual ~ChildProcessSlave();
/** This checks some command-line parameters to see whether they were generated by
ChildProcessMaster::launchSlaveProcess(), and if so, connects to that master process.
In an exe that can be used as a child process, you should add some code to your
main() or JUCEApplication::initialise() that calls this method.
The commandLineUniqueID should be a short alphanumeric identifier (no spaces!)
that matches the string passed to ChildProcessMaster::launchSlaveProcess().
The timeoutMs parameter lets you specify how long the child process is allowed
to run without receiving a ping from the master before the master is considered to
have died, and handleConnectionLost() will be called. Passing <= 0 for this timeout
makes it use a default value.
Returns true if the command-line matches and the connection is made successfully.
*/
bool initialiseFromCommandLine (const String& commandLine,
const String& commandLineUniqueID,
int timeoutMs = 0);
//==============================================================================
/** This will be called to deliver messages from the master process.
The call will probably be made on a background thread, so be careful with your
thread-safety! You may want to respond by sending back a message with
sendMessageToMaster()
*/
virtual void handleMessageFromMaster (const MemoryBlock&) = 0;
/** This will be called when the master process finishes connecting to this slave.
The call will probably be made on a background thread, so be careful with your thread-safety!
*/
virtual void handleConnectionMade();
/** This will be called when the connection to the master process is lost.
The call may be made from any thread (including the message thread).
Typically, if your process only exists to act as a slave, you should probably exit
when this happens.
*/
virtual void handleConnectionLost();
/** Tries to send a message to the master process.
This returns true if the message was sent, but doesn't check that it actually gets
delivered at the other end. If successful, the data will emerge in a call to your
ChildProcessMaster::handleMessageFromSlave().
*/
bool sendMessageToMaster (const MemoryBlock&);
private:
struct Connection;
std::unique_ptr<Connection> connection;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessSlave)
};
//==============================================================================
/**
Acts as the master in a master/slave pair of connected processes.
The ChildProcessSlave and ChildProcessMaster classes make it easy for an app
to spawn a child process, and to manage a 2-way messaging connection to control it.
To use the system, you need to create subclasses of both ChildProcessSlave and
ChildProcessMaster. When you want your master process to launch the slave, you
just call launchSlaveProcess(), and it'll attempt to launch the executable that
you specify (which may be the same exe), and assuming it has been set-up to
correctly parse the command-line parameters (see ChildProcessSlave) then a
two-way connection will be created.
The juce demo app has a good example of this class in action.
@see ChildProcessSlave, InterprocessConnection, ChildProcess
@tags{Events}
*/
class JUCE_API ChildProcessMaster
{
public:
/** Creates an uninitialised master process object.
Use launchSlaveProcess to launch and connect to a child process.
*/
ChildProcessMaster();
/** Destructor.
Note that the destructor calls killSlaveProcess(), but doesn't wait for
the child process to finish terminating.
*/
virtual ~ChildProcessMaster();
/** Attempts to launch and connect to a slave process.
This will start the given executable, passing it a special command-line
parameter based around the commandLineUniqueID string, which must be a
short alphanumeric string (no spaces!) that identifies your app. The exe
that gets launched must respond by calling ChildProcessSlave::initialiseFromCommandLine()
in its startup code, and must use a matching ID to commandLineUniqueID.
The timeoutMs parameter lets you specify how long the child process is allowed
to go without sending a ping before it is considered to have died and
handleConnectionLost() will be called. Passing <= 0 for this timeout makes
it use a default value.
If this all works, the method returns true, and you can begin sending and
receiving messages with the slave process.
If a child process is already running, this will call killSlaveProcess() and
start a new one.
*/
bool launchSlaveProcess (const File& executableToLaunch,
const String& commandLineUniqueID,
int timeoutMs = 0,
int streamFlags = ChildProcess::wantStdOut | ChildProcess::wantStdErr);
/** Sends a kill message to the slave, and disconnects from it.
Note that this won't wait for it to terminate.
*/
void killSlaveProcess();
/** This will be called to deliver a message from the slave process.
The call will probably be made on a background thread, so be careful with your thread-safety!
*/
virtual void handleMessageFromSlave (const MemoryBlock&) = 0;
/** This will be called when the slave process dies or is somehow disconnected.
The call will probably be made on a background thread, so be careful with your thread-safety!
*/
virtual void handleConnectionLost();
/** Attempts to send a message to the slave process.
This returns true if the message was dispatched, but doesn't check that it actually
gets delivered at the other end. If successful, the data will emerge in a call to
your ChildProcessSlave::handleMessageFromMaster().
*/
bool sendMessageToSlave (const MemoryBlock&);
private:
std::unique_ptr<ChildProcess> childProcess;
struct Connection;
std::unique_ptr<Connection> connection;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessMaster)
};
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
//==============================================================================
/**
Acts as the worker end of a coordinator/worker pair of connected processes.
The ChildProcessWorker and ChildProcessCoordinator classes make it easy for an app
to spawn a child process, and to manage a 2-way messaging connection to control it.
To use the system, you need to create subclasses of both ChildProcessWorker and
ChildProcessCoordinator. To instantiate the ChildProcessWorker object, you must
add some code to your main() or JUCEApplication::initialise() function that
calls the initialiseFromCommandLine() method to check the app's command-line
parameters to see whether it's being launched as a child process. If this returns
true then the worker process can be allowed to run, and its handleMessageFromCoordinator()
method will be called whenever a message arrives.
The juce demo app has a good example of this class in action.
@see ChildProcessCoordinator, InterprocessConnection, ChildProcess
@tags{Events}
*/
class JUCE_API ChildProcessWorker
{
public:
/** Creates a non-connected worker process.
Use initialiseFromCommandLine to connect to a coordinator process.
*/
ChildProcessWorker();
/** Destructor. */
virtual ~ChildProcessWorker();
/** This checks some command-line parameters to see whether they were generated by
ChildProcessCoordinator::launchWorkerProcess(), and if so, connects to that coordinator process.
In an exe that can be used as a child process, you should add some code to your
main() or JUCEApplication::initialise() that calls this method.
The commandLineUniqueID should be a short alphanumeric identifier (no spaces!)
that matches the string passed to ChildProcessCoordinator::launchWorkerProcess().
The timeoutMs parameter lets you specify how long the child process is allowed
to run without receiving a ping from the coordinator before the coordinator is considered to
have died, and handleConnectionLost() will be called. Passing <= 0 for this timeout
makes it use a default value.
Returns true if the command-line matches and the connection is made successfully.
*/
bool initialiseFromCommandLine (const String& commandLine,
const String& commandLineUniqueID,
int timeoutMs = 0);
//==============================================================================
/** This will be called to deliver messages from the coordinator process.
The call will probably be made on a background thread, so be careful with your
thread-safety! You may want to respond by sending back a message with
sendMessageToCoordinator()
*/
virtual void handleMessageFromCoordinator (const MemoryBlock& mb);
[[deprecated ("Replaced by handleMessageFromCoordinator.")]]
virtual void handleMessageFromMaster (const MemoryBlock&) {}
/** This will be called when the coordinator process finishes connecting to this worker.
The call will probably be made on a background thread, so be careful with your thread-safety!
*/
virtual void handleConnectionMade();
/** This will be called when the connection to the coordinator process is lost.
The call may be made from any thread (including the message thread).
Typically, if your process only exists to act as a worker, you should probably exit
when this happens.
*/
virtual void handleConnectionLost();
/** Tries to send a message to the coordinator process.
This returns true if the message was sent, but doesn't check that it actually gets
delivered at the other end. If successful, the data will emerge in a call to your
ChildProcessCoordinator::handleMessageFromWorker().
*/
bool sendMessageToCoordinator (const MemoryBlock&);
[[deprecated ("Replaced by sendMessageToCoordinator.")]]
bool sendMessageToMaster (const MemoryBlock& mb) { return sendMessageToCoordinator (mb); }
private:
struct Connection;
std::unique_ptr<Connection> connection;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessWorker)
};
using ChildProcessSlave [[deprecated ("Replaced by ChildProcessWorker.")]] = ChildProcessWorker;
//==============================================================================
/**
Acts as the coordinator in a coordinator/worker pair of connected processes.
The ChildProcessWorker and ChildProcessCoordinator classes make it easy for an app
to spawn a child process, and to manage a 2-way messaging connection to control it.
To use the system, you need to create subclasses of both ChildProcessWorker and
ChildProcessCoordinator. When you want your coordinator process to launch the worker, you
just call launchWorkerProcess(), and it'll attempt to launch the executable that
you specify (which may be the same exe), and assuming it has been set-up to
correctly parse the command-line parameters (see ChildProcessWorker) then a
two-way connection will be created.
The juce demo app has a good example of this class in action.
@see ChildProcessWorker, InterprocessConnection, ChildProcess
@tags{Events}
*/
class JUCE_API ChildProcessCoordinator
{
public:
/** Creates an uninitialised coordinator process object.
Use launchWorkerProcess to launch and connect to a child process.
*/
ChildProcessCoordinator();
/** Destructor.
Note that the destructor calls killWorkerProcess(), but doesn't wait for
the child process to finish terminating.
*/
virtual ~ChildProcessCoordinator();
/** Attempts to launch and connect to a worker process.
This will start the given executable, passing it a special command-line
parameter based around the commandLineUniqueID string, which must be a
short alphanumeric string (no spaces!) that identifies your app. The exe
that gets launched must respond by calling ChildProcessWorker::initialiseFromCommandLine()
in its startup code, and must use a matching ID to commandLineUniqueID.
The timeoutMs parameter lets you specify how long the child process is allowed
to go without sending a ping before it is considered to have died and
handleConnectionLost() will be called. Passing <= 0 for this timeout makes
it use a default value.
If this all works, the method returns true, and you can begin sending and
receiving messages with the worker process.
If a child process is already running, this will call killWorkerProcess() and
start a new one.
*/
bool launchWorkerProcess (const File& executableToLaunch,
const String& commandLineUniqueID,
int timeoutMs = 0,
int streamFlags = ChildProcess::wantStdOut | ChildProcess::wantStdErr);
[[deprecated ("Replaced by launchWorkerProcess.")]]
bool launchSlaveProcess (const File& executableToLaunch,
const String& commandLineUniqueID,
int timeoutMs = 0,
int streamFlags = ChildProcess::wantStdOut | ChildProcess::wantStdErr)
{
return launchWorkerProcess (executableToLaunch, commandLineUniqueID, timeoutMs, streamFlags);
}
/** Sends a kill message to the worker, and disconnects from it.
Note that this won't wait for it to terminate.
*/
void killWorkerProcess();
[[deprecated ("Replaced by killWorkerProcess.")]]
void killSlaveProcess() { killWorkerProcess(); }
/** This will be called to deliver a message from the worker process.
The call will probably be made on a background thread, so be careful with your thread-safety!
*/
virtual void handleMessageFromWorker (const MemoryBlock&);
[[deprecated ("Replaced by handleMessageFromWorker")]]
virtual void handleMessageFromSlave (const MemoryBlock&) {}
/** This will be called when the worker process dies or is somehow disconnected.
The call will probably be made on a background thread, so be careful with your thread-safety!
*/
virtual void handleConnectionLost();
/** Attempts to send a message to the worker process.
This returns true if the message was dispatched, but doesn't check that it actually
gets delivered at the other end. If successful, the data will emerge in a call to
your ChildProcessWorker::handleMessageFromCoordinator().
*/
bool sendMessageToWorker (const MemoryBlock&);
[[deprecated ("Replaced by sendMessageToWorker.")]]
bool sendMessageToSlave (const MemoryBlock& mb) { return sendMessageToWorker (mb); }
private:
std::unique_ptr<ChildProcess> childProcess;
struct Connection;
std::unique_ptr<Connection> connection;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessCoordinator)
};
using ChildProcessMaster [[deprecated ("Replaced by ChildProcessCoordinator.")]] = ChildProcessCoordinator;
} // namespace juce

View File

@ -1,431 +1,431 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
struct InterprocessConnection::ConnectionThread : public Thread
{
ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
void run() override { owner.runThread(); }
InterprocessConnection& owner;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
};
class SafeActionImpl
{
public:
explicit SafeActionImpl (InterprocessConnection& p)
: ref (p) {}
template <typename Fn>
void ifSafe (Fn&& fn)
{
const ScopedLock lock (mutex);
if (safe)
fn (ref);
}
void setSafe (bool s)
{
const ScopedLock lock (mutex);
safe = s;
}
bool isSafe()
{
const ScopedLock lock (mutex);
return safe;
}
private:
CriticalSection mutex;
InterprocessConnection& ref;
bool safe = false;
};
class InterprocessConnection::SafeAction : public SafeActionImpl
{
using SafeActionImpl::SafeActionImpl;
};
//==============================================================================
InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
: useMessageThread (callbacksOnMessageThread),
magicMessageHeader (magicMessageHeaderNumber),
safeAction (std::make_shared<SafeAction> (*this))
{
thread.reset (new ConnectionThread (*this));
}
InterprocessConnection::~InterprocessConnection()
{
// You *must* call `disconnect` in the destructor of your derived class to ensure
// that any pending messages are not delivered. If the messages were delivered after
// destroying the derived class, we'd end up calling the pure virtual implementations
// of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
// not a good idea!
jassert (! safeAction->isSafe());
callbackConnectionState = false;
disconnect (4000, Notify::no);
thread.reset();
}
//==============================================================================
bool InterprocessConnection::connectToSocket (const String& hostName,
int portNumber, int timeOutMillisecs)
{
disconnect();
auto s = std::make_unique<StreamingSocket>();
if (s->connect (hostName, portNumber, timeOutMillisecs))
{
const ScopedWriteLock sl (pipeAndSocketLock);
initialiseWithSocket (std::move (s));
return true;
}
return false;
}
bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
{
disconnect();
auto newPipe = std::make_unique<NamedPipe>();
if (newPipe->openExisting (pipeName))
{
const ScopedWriteLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (std::move (newPipe));
return true;
}
return false;
}
bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, bool mustNotExist)
{
disconnect();
auto newPipe = std::make_unique<NamedPipe>();
if (newPipe->createNewPipe (pipeName, mustNotExist))
{
const ScopedWriteLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (std::move (newPipe));
return true;
}
return false;
}
void InterprocessConnection::disconnect (int timeoutMs, Notify notify)
{
thread->signalThreadShouldExit();
{
const ScopedReadLock sl (pipeAndSocketLock);
if (socket != nullptr) socket->close();
if (pipe != nullptr) pipe->close();
}
thread->stopThread (timeoutMs);
deletePipeAndSocket();
if (notify == Notify::yes)
connectionLostInt();
callbackConnectionState = false;
safeAction->setSafe (false);
}
void InterprocessConnection::deletePipeAndSocket()
{
const ScopedWriteLock sl (pipeAndSocketLock);
socket.reset();
pipe.reset();
}
bool InterprocessConnection::isConnected() const
{
const ScopedReadLock sl (pipeAndSocketLock);
return ((socket != nullptr && socket->isConnected())
|| (pipe != nullptr && pipe->isOpen()))
&& threadIsRunning;
}
String InterprocessConnection::getConnectedHostName() const
{
{
const ScopedReadLock sl (pipeAndSocketLock);
if (pipe == nullptr && socket == nullptr)
return {};
if (socket != nullptr && ! socket->isLocal())
return socket->getHostName();
}
return IPAddress::local().toString();
}
//==============================================================================
bool InterprocessConnection::sendMessage (const MemoryBlock& message)
{
uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
ByteOrder::swapIfBigEndian ((uint32) message.getSize()) };
MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
}
int InterprocessConnection::writeData (void* data, int dataSize)
{
const ScopedReadLock sl (pipeAndSocketLock);
if (socket != nullptr)
return socket->write (data, dataSize);
if (pipe != nullptr)
return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
return 0;
}
//==============================================================================
void InterprocessConnection::initialise()
{
safeAction->setSafe (true);
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
}
void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
{
jassert (socket == nullptr && pipe == nullptr);
socket = std::move (newSocket);
initialise();
}
void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
{
jassert (socket == nullptr && pipe == nullptr);
pipe = std::move (newPipe);
initialise();
}
//==============================================================================
struct ConnectionStateMessage : public MessageManager::MessageBase
{
ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
: safeAction (ipc), connectionMade (connected)
{}
void messageCallback() override
{
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
if (connectionMade)
owner.connectionMade();
else
owner.connectionLost();
});
}
std::shared_ptr<SafeActionImpl> safeAction;
bool connectionMade;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
};
void InterprocessConnection::connectionMadeInt()
{
if (! callbackConnectionState)
{
callbackConnectionState = true;
if (useMessageThread)
(new ConnectionStateMessage (safeAction, true))->post();
else
connectionMade();
}
}
void InterprocessConnection::connectionLostInt()
{
if (callbackConnectionState)
{
callbackConnectionState = false;
if (useMessageThread)
(new ConnectionStateMessage (safeAction, false))->post();
else
connectionLost();
}
}
struct DataDeliveryMessage : public Message
{
DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc, const MemoryBlock& d)
: safeAction (ipc), data (d)
{}
void messageCallback() override
{
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
owner.messageReceived (data);
});
}
std::shared_ptr<SafeActionImpl> safeAction;
MemoryBlock data;
};
void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
{
jassert (callbackConnectionState);
if (useMessageThread)
(new DataDeliveryMessage (safeAction, data))->post();
else
messageReceived (data);
}
//==============================================================================
int InterprocessConnection::readData (void* data, int num)
{
const ScopedReadLock sl (pipeAndSocketLock);
if (socket != nullptr)
return socket->read (data, num, true);
if (pipe != nullptr)
return pipe->read (data, num, pipeReceiveMessageTimeout);
jassertfalse;
return -1;
}
bool InterprocessConnection::readNextMessage()
{
uint32 messageHeader[2];
auto bytes = readData (messageHeader, sizeof (messageHeader));
if (bytes == (int) sizeof (messageHeader)
&& ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
{
auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
if (bytesInMessage > 0)
{
MemoryBlock messageData ((size_t) bytesInMessage, true);
int bytesRead = 0;
while (bytesInMessage > 0)
{
if (thread->threadShouldExit())
return false;
auto numThisTime = jmin (bytesInMessage, 65536);
auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
if (bytesIn <= 0)
break;
bytesRead += bytesIn;
bytesInMessage -= bytesIn;
}
if (bytesRead >= 0)
deliverDataInt (messageData);
}
return true;
}
if (bytes < 0)
{
if (socket != nullptr)
deletePipeAndSocket();
connectionLostInt();
}
return false;
}
void InterprocessConnection::runThread()
{
while (! thread->threadShouldExit())
{
if (socket != nullptr)
{
auto ready = socket->waitUntilReady (true, 100);
if (ready < 0)
{
deletePipeAndSocket();
connectionLostInt();
break;
}
if (ready == 0)
{
thread->wait (1);
continue;
}
}
else if (pipe != nullptr)
{
if (! pipe->isOpen())
{
deletePipeAndSocket();
connectionLostInt();
break;
}
}
else
{
break;
}
if (thread->threadShouldExit() || ! readNextMessage())
break;
}
threadIsRunning = false;
}
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
struct InterprocessConnection::ConnectionThread : public Thread
{
ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
void run() override { owner.runThread(); }
InterprocessConnection& owner;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
};
class SafeActionImpl
{
public:
explicit SafeActionImpl (InterprocessConnection& p)
: ref (p) {}
template <typename Fn>
void ifSafe (Fn&& fn)
{
const ScopedLock lock (mutex);
if (safe)
fn (ref);
}
void setSafe (bool s)
{
const ScopedLock lock (mutex);
safe = s;
}
bool isSafe()
{
const ScopedLock lock (mutex);
return safe;
}
private:
CriticalSection mutex;
InterprocessConnection& ref;
bool safe = false;
};
class InterprocessConnection::SafeAction : public SafeActionImpl
{
using SafeActionImpl::SafeActionImpl;
};
//==============================================================================
InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
: useMessageThread (callbacksOnMessageThread),
magicMessageHeader (magicMessageHeaderNumber),
safeAction (std::make_shared<SafeAction> (*this))
{
thread.reset (new ConnectionThread (*this));
}
InterprocessConnection::~InterprocessConnection()
{
// You *must* call `disconnect` in the destructor of your derived class to ensure
// that any pending messages are not delivered. If the messages were delivered after
// destroying the derived class, we'd end up calling the pure virtual implementations
// of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
// not a good idea!
jassert (! safeAction->isSafe());
callbackConnectionState = false;
disconnect (4000, Notify::no);
thread.reset();
}
//==============================================================================
bool InterprocessConnection::connectToSocket (const String& hostName,
int portNumber, int timeOutMillisecs)
{
disconnect();
auto s = std::make_unique<StreamingSocket>();
if (s->connect (hostName, portNumber, timeOutMillisecs))
{
const ScopedWriteLock sl (pipeAndSocketLock);
initialiseWithSocket (std::move (s));
return true;
}
return false;
}
bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
{
disconnect();
auto newPipe = std::make_unique<NamedPipe>();
if (newPipe->openExisting (pipeName))
{
const ScopedWriteLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (std::move (newPipe));
return true;
}
return false;
}
bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, bool mustNotExist)
{
disconnect();
auto newPipe = std::make_unique<NamedPipe>();
if (newPipe->createNewPipe (pipeName, mustNotExist))
{
const ScopedWriteLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (std::move (newPipe));
return true;
}
return false;
}
void InterprocessConnection::disconnect (int timeoutMs, Notify notify)
{
thread->signalThreadShouldExit();
{
const ScopedReadLock sl (pipeAndSocketLock);
if (socket != nullptr) socket->close();
if (pipe != nullptr) pipe->close();
}
thread->stopThread (timeoutMs);
deletePipeAndSocket();
if (notify == Notify::yes)
connectionLostInt();
callbackConnectionState = false;
safeAction->setSafe (false);
}
void InterprocessConnection::deletePipeAndSocket()
{
const ScopedWriteLock sl (pipeAndSocketLock);
socket.reset();
pipe.reset();
}
bool InterprocessConnection::isConnected() const
{
const ScopedReadLock sl (pipeAndSocketLock);
return ((socket != nullptr && socket->isConnected())
|| (pipe != nullptr && pipe->isOpen()))
&& threadIsRunning;
}
String InterprocessConnection::getConnectedHostName() const
{
{
const ScopedReadLock sl (pipeAndSocketLock);
if (pipe == nullptr && socket == nullptr)
return {};
if (socket != nullptr && ! socket->isLocal())
return socket->getHostName();
}
return IPAddress::local().toString();
}
//==============================================================================
bool InterprocessConnection::sendMessage (const MemoryBlock& message)
{
uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
ByteOrder::swapIfBigEndian ((uint32) message.getSize()) };
MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
}
int InterprocessConnection::writeData (void* data, int dataSize)
{
const ScopedReadLock sl (pipeAndSocketLock);
if (socket != nullptr)
return socket->write (data, dataSize);
if (pipe != nullptr)
return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
return 0;
}
//==============================================================================
void InterprocessConnection::initialise()
{
safeAction->setSafe (true);
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
}
void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
{
jassert (socket == nullptr && pipe == nullptr);
socket = std::move (newSocket);
initialise();
}
void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
{
jassert (socket == nullptr && pipe == nullptr);
pipe = std::move (newPipe);
initialise();
}
//==============================================================================
struct ConnectionStateMessage : public MessageManager::MessageBase
{
ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
: safeAction (ipc), connectionMade (connected)
{}
void messageCallback() override
{
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
if (connectionMade)
owner.connectionMade();
else
owner.connectionLost();
});
}
std::shared_ptr<SafeActionImpl> safeAction;
bool connectionMade;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
};
void InterprocessConnection::connectionMadeInt()
{
if (! callbackConnectionState)
{
callbackConnectionState = true;
if (useMessageThread)
(new ConnectionStateMessage (safeAction, true))->post();
else
connectionMade();
}
}
void InterprocessConnection::connectionLostInt()
{
if (callbackConnectionState)
{
callbackConnectionState = false;
if (useMessageThread)
(new ConnectionStateMessage (safeAction, false))->post();
else
connectionLost();
}
}
struct DataDeliveryMessage : public Message
{
DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc, const MemoryBlock& d)
: safeAction (ipc), data (d)
{}
void messageCallback() override
{
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
owner.messageReceived (data);
});
}
std::shared_ptr<SafeActionImpl> safeAction;
MemoryBlock data;
};
void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
{
jassert (callbackConnectionState);
if (useMessageThread)
(new DataDeliveryMessage (safeAction, data))->post();
else
messageReceived (data);
}
//==============================================================================
int InterprocessConnection::readData (void* data, int num)
{
const ScopedReadLock sl (pipeAndSocketLock);
if (socket != nullptr)
return socket->read (data, num, true);
if (pipe != nullptr)
return pipe->read (data, num, pipeReceiveMessageTimeout);
jassertfalse;
return -1;
}
bool InterprocessConnection::readNextMessage()
{
uint32 messageHeader[2];
auto bytes = readData (messageHeader, sizeof (messageHeader));
if (bytes == (int) sizeof (messageHeader)
&& ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
{
auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
if (bytesInMessage > 0)
{
MemoryBlock messageData ((size_t) bytesInMessage, true);
int bytesRead = 0;
while (bytesInMessage > 0)
{
if (thread->threadShouldExit())
return false;
auto numThisTime = jmin (bytesInMessage, 65536);
auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
if (bytesIn <= 0)
break;
bytesRead += bytesIn;
bytesInMessage -= bytesIn;
}
if (bytesRead >= 0)
deliverDataInt (messageData);
}
return true;
}
if (bytes < 0)
{
if (socket != nullptr)
deletePipeAndSocket();
connectionLostInt();
}
return false;
}
void InterprocessConnection::runThread()
{
while (! thread->threadShouldExit())
{
if (socket != nullptr)
{
auto ready = socket->waitUntilReady (true, 100);
if (ready < 0)
{
deletePipeAndSocket();
connectionLostInt();
break;
}
if (ready == 0)
{
thread->wait (1);
continue;
}
}
else if (pipe != nullptr)
{
if (! pipe->isOpen())
{
deletePipeAndSocket();
connectionLostInt();
break;
}
}
else
{
break;
}
if (thread->threadShouldExit() || ! readNextMessage())
break;
}
threadIsRunning = false;
}
} // namespace juce

View File

@ -1,226 +1,226 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
class InterprocessConnectionServer;
class MemoryBlock;
//==============================================================================
/**
Manages a simple two-way messaging connection to another process, using either
a socket or a named pipe as the transport medium.
To connect to a waiting socket or an open pipe, use the connectToSocket() or
connectToPipe() methods. If this succeeds, messages can be sent to the other end,
and incoming messages will result in a callback via the messageReceived()
method.
To open a pipe and wait for another client to connect to it, use the createPipe()
method.
To act as a socket server and create connections for one or more client, see the
InterprocessConnectionServer class.
IMPORTANT NOTE: Your derived Connection class *must* call `disconnect` in its destructor
in order to cancel any pending messages before the class is destroyed.
@see InterprocessConnectionServer, Socket, NamedPipe
@tags{Events}
*/
class JUCE_API InterprocessConnection
{
public:
//==============================================================================
/** Creates a connection.
Connections are created manually, connecting them with the connectToSocket()
or connectToPipe() methods, or they are created automatically by a InterprocessConnectionServer
when a client wants to connect.
@param callbacksOnMessageThread if true, callbacks to the connectionMade(),
connectionLost() and messageReceived() methods will
always be made using the message thread; if false,
these will be called immediately on the connection's
own thread.
@param magicMessageHeaderNumber a magic number to use in the header to check the
validity of the data blocks being sent and received. This
can be any number, but the sender and receiver must obviously
use matching values or they won't recognise each other.
*/
InterprocessConnection (bool callbacksOnMessageThread = true,
uint32 magicMessageHeaderNumber = 0xf2b49e2c);
/** Destructor. */
virtual ~InterprocessConnection();
//==============================================================================
/** Tries to connect this object to a socket.
For this to work, the machine on the other end needs to have a InterprocessConnectionServer
object waiting to receive client connections on this port number.
@param hostName the host computer, either a network address or name
@param portNumber the socket port number to try to connect to
@param timeOutMillisecs how long to keep trying before giving up
@returns true if the connection is established successfully
@see Socket
*/
bool connectToSocket (const String& hostName,
int portNumber,
int timeOutMillisecs);
/** Tries to connect the object to an existing named pipe.
For this to work, another process on the same computer must already have opened
an InterprocessConnection object and used createPipe() to create a pipe for this
to connect to.
@param pipeName the name to use for the pipe - this should be unique to your app
@param pipeReceiveMessageTimeoutMs a timeout length to be used when reading or writing
to the pipe, or -1 for an infinite timeout.
@returns true if it connects successfully.
@see createPipe, NamedPipe
*/
bool connectToPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs);
/** Tries to create a new pipe for other processes to connect to.
This creates a pipe with the given name, so that other processes can use
connectToPipe() to connect to the other end.
@param pipeName the name to use for the pipe - this should be unique to your app
@param pipeReceiveMessageTimeoutMs a timeout length to be used when reading or writing
to the pipe, or -1 for an infinite timeout
@param mustNotExist if set to true, the method will fail if the pipe already exists
@returns true if the pipe was created, or false if it fails (e.g. if another process is
already using the pipe)
*/
bool createPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist = false);
/** Whether the disconnect call should trigger callbacks. */
enum class Notify { no, yes };
/** Disconnects and closes any currently-open sockets or pipes.
Derived classes *must* call this in their destructors in order to avoid undefined
behaviour.
@param timeoutMs the time in ms to wait before killing the thread by force
@param notify whether or not to call `connectionLost`
*/
void disconnect (int timeoutMs = -1, Notify notify = Notify::yes);
/** True if a socket or pipe is currently active. */
bool isConnected() const;
/** Returns the socket that this connection is using (or nullptr if it uses a pipe). */
StreamingSocket* getSocket() const noexcept { return socket.get(); }
/** Returns the pipe that this connection is using (or nullptr if it uses a socket). */
NamedPipe* getPipe() const noexcept { return pipe.get(); }
/** Returns the name of the machine at the other end of this connection.
This may return an empty string if the name is unknown.
*/
String getConnectedHostName() const;
//==============================================================================
/** Tries to send a message to the other end of this connection.
This will fail if it's not connected, or if there's some kind of write error. If
it succeeds, the connection object at the other end will receive the message by
a callback to its messageReceived() method.
@see messageReceived
*/
bool sendMessage (const MemoryBlock& message);
//==============================================================================
/** Called when the connection is first connected.
If the connection was created with the callbacksOnMessageThread flag set, then
this will be called on the message thread; otherwise it will be called on a server
thread.
*/
virtual void connectionMade() = 0;
/** Called when the connection is broken.
If the connection was created with the callbacksOnMessageThread flag set, then
this will be called on the message thread; otherwise it will be called on a server
thread.
*/
virtual void connectionLost() = 0;
/** Called when a message arrives.
When the object at the other end of this connection sends us a message with sendMessage(),
this callback is used to deliver it to us.
If the connection was created with the callbacksOnMessageThread flag set, then
this will be called on the message thread; otherwise it will be called on a server
thread.
@see sendMessage
*/
virtual void messageReceived (const MemoryBlock& message) = 0;
private:
//==============================================================================
ReadWriteLock pipeAndSocketLock;
std::unique_ptr<StreamingSocket> socket;
std::unique_ptr<NamedPipe> pipe;
bool callbackConnectionState = false;
const bool useMessageThread;
const uint32 magicMessageHeader;
int pipeReceiveMessageTimeout = -1;
friend class InterprocessConnectionServer;
void initialise();
void initialiseWithSocket (std::unique_ptr<StreamingSocket>);
void initialiseWithPipe (std::unique_ptr<NamedPipe>);
void deletePipeAndSocket();
void connectionMadeInt();
void connectionLostInt();
void deliverDataInt (const MemoryBlock&);
bool readNextMessage();
int readData (void*, int);
struct ConnectionThread;
std::unique_ptr<ConnectionThread> thread;
std::atomic<bool> threadIsRunning { false };
class SafeAction;
std::shared_ptr<SafeAction> safeAction;
void runThread();
int writeData (void*, int);
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnection)
};
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
class InterprocessConnectionServer;
class MemoryBlock;
//==============================================================================
/**
Manages a simple two-way messaging connection to another process, using either
a socket or a named pipe as the transport medium.
To connect to a waiting socket or an open pipe, use the connectToSocket() or
connectToPipe() methods. If this succeeds, messages can be sent to the other end,
and incoming messages will result in a callback via the messageReceived()
method.
To open a pipe and wait for another client to connect to it, use the createPipe()
method.
To act as a socket server and create connections for one or more client, see the
InterprocessConnectionServer class.
IMPORTANT NOTE: Your derived Connection class *must* call `disconnect` in its destructor
in order to cancel any pending messages before the class is destroyed.
@see InterprocessConnectionServer, Socket, NamedPipe
@tags{Events}
*/
class JUCE_API InterprocessConnection
{
public:
//==============================================================================
/** Creates a connection.
Connections are created manually, connecting them with the connectToSocket()
or connectToPipe() methods, or they are created automatically by a InterprocessConnectionServer
when a client wants to connect.
@param callbacksOnMessageThread if true, callbacks to the connectionMade(),
connectionLost() and messageReceived() methods will
always be made using the message thread; if false,
these will be called immediately on the connection's
own thread.
@param magicMessageHeaderNumber a magic number to use in the header to check the
validity of the data blocks being sent and received. This
can be any number, but the sender and receiver must obviously
use matching values or they won't recognise each other.
*/
InterprocessConnection (bool callbacksOnMessageThread = true,
uint32 magicMessageHeaderNumber = 0xf2b49e2c);
/** Destructor. */
virtual ~InterprocessConnection();
//==============================================================================
/** Tries to connect this object to a socket.
For this to work, the machine on the other end needs to have a InterprocessConnectionServer
object waiting to receive client connections on this port number.
@param hostName the host computer, either a network address or name
@param portNumber the socket port number to try to connect to
@param timeOutMillisecs how long to keep trying before giving up
@returns true if the connection is established successfully
@see Socket
*/
bool connectToSocket (const String& hostName,
int portNumber,
int timeOutMillisecs);
/** Tries to connect the object to an existing named pipe.
For this to work, another process on the same computer must already have opened
an InterprocessConnection object and used createPipe() to create a pipe for this
to connect to.
@param pipeName the name to use for the pipe - this should be unique to your app
@param pipeReceiveMessageTimeoutMs a timeout length to be used when reading or writing
to the pipe, or -1 for an infinite timeout.
@returns true if it connects successfully.
@see createPipe, NamedPipe
*/
bool connectToPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs);
/** Tries to create a new pipe for other processes to connect to.
This creates a pipe with the given name, so that other processes can use
connectToPipe() to connect to the other end.
@param pipeName the name to use for the pipe - this should be unique to your app
@param pipeReceiveMessageTimeoutMs a timeout length to be used when reading or writing
to the pipe, or -1 for an infinite timeout
@param mustNotExist if set to true, the method will fail if the pipe already exists
@returns true if the pipe was created, or false if it fails (e.g. if another process is
already using the pipe)
*/
bool createPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist = false);
/** Whether the disconnect call should trigger callbacks. */
enum class Notify { no, yes };
/** Disconnects and closes any currently-open sockets or pipes.
Derived classes *must* call this in their destructors in order to avoid undefined
behaviour.
@param timeoutMs the time in ms to wait before killing the thread by force
@param notify whether or not to call `connectionLost`
*/
void disconnect (int timeoutMs = -1, Notify notify = Notify::yes);
/** True if a socket or pipe is currently active. */
bool isConnected() const;
/** Returns the socket that this connection is using (or nullptr if it uses a pipe). */
StreamingSocket* getSocket() const noexcept { return socket.get(); }
/** Returns the pipe that this connection is using (or nullptr if it uses a socket). */
NamedPipe* getPipe() const noexcept { return pipe.get(); }
/** Returns the name of the machine at the other end of this connection.
This may return an empty string if the name is unknown.
*/
String getConnectedHostName() const;
//==============================================================================
/** Tries to send a message to the other end of this connection.
This will fail if it's not connected, or if there's some kind of write error. If
it succeeds, the connection object at the other end will receive the message by
a callback to its messageReceived() method.
@see messageReceived
*/
bool sendMessage (const MemoryBlock& message);
//==============================================================================
/** Called when the connection is first connected.
If the connection was created with the callbacksOnMessageThread flag set, then
this will be called on the message thread; otherwise it will be called on a server
thread.
*/
virtual void connectionMade() = 0;
/** Called when the connection is broken.
If the connection was created with the callbacksOnMessageThread flag set, then
this will be called on the message thread; otherwise it will be called on a server
thread.
*/
virtual void connectionLost() = 0;
/** Called when a message arrives.
When the object at the other end of this connection sends us a message with sendMessage(),
this callback is used to deliver it to us.
If the connection was created with the callbacksOnMessageThread flag set, then
this will be called on the message thread; otherwise it will be called on a server
thread.
@see sendMessage
*/
virtual void messageReceived (const MemoryBlock& message) = 0;
private:
//==============================================================================
ReadWriteLock pipeAndSocketLock;
std::unique_ptr<StreamingSocket> socket;
std::unique_ptr<NamedPipe> pipe;
bool callbackConnectionState = false;
const bool useMessageThread;
const uint32 magicMessageHeader;
int pipeReceiveMessageTimeout = -1;
friend class InterprocessConnectionServer;
void initialise();
void initialiseWithSocket (std::unique_ptr<StreamingSocket>);
void initialiseWithPipe (std::unique_ptr<NamedPipe>);
void deletePipeAndSocket();
void connectionMadeInt();
void connectionLostInt();
void deliverDataInt (const MemoryBlock&);
bool readNextMessage();
int readData (void*, int);
struct ConnectionThread;
std::unique_ptr<ConnectionThread> thread;
std::atomic<bool> threadIsRunning { false };
class SafeAction;
std::shared_ptr<SafeAction> safeAction;
void runThread();
int writeData (void*, int);
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnection)
};
} // namespace juce

View File

@ -1,80 +1,80 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
InterprocessConnectionServer::InterprocessConnectionServer() : Thread ("JUCE IPC server")
{
}
InterprocessConnectionServer::~InterprocessConnectionServer()
{
stop();
}
//==============================================================================
bool InterprocessConnectionServer::beginWaitingForSocket (const int portNumber, const String& bindAddress)
{
stop();
socket.reset (new StreamingSocket());
if (socket->createListener (portNumber, bindAddress))
{
startThread();
return true;
}
socket.reset();
return false;
}
void InterprocessConnectionServer::stop()
{
signalThreadShouldExit();
if (socket != nullptr)
socket->close();
stopThread (4000);
socket.reset();
}
int InterprocessConnectionServer::getBoundPort() const noexcept
{
return (socket == nullptr) ? -1 : socket->getBoundPort();
}
void InterprocessConnectionServer::run()
{
while ((! threadShouldExit()) && socket != nullptr)
{
std::unique_ptr<StreamingSocket> clientSocket (socket->waitForNextConnection());
if (clientSocket != nullptr)
if (auto* newConnection = createConnectionObject())
newConnection->initialiseWithSocket (std::move (clientSocket));
}
}
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
InterprocessConnectionServer::InterprocessConnectionServer() : Thread ("JUCE IPC server")
{
}
InterprocessConnectionServer::~InterprocessConnectionServer()
{
stop();
}
//==============================================================================
bool InterprocessConnectionServer::beginWaitingForSocket (const int portNumber, const String& bindAddress)
{
stop();
socket.reset (new StreamingSocket());
if (socket->createListener (portNumber, bindAddress))
{
startThread();
return true;
}
socket.reset();
return false;
}
void InterprocessConnectionServer::stop()
{
signalThreadShouldExit();
if (socket != nullptr)
socket->close();
stopThread (4000);
socket.reset();
}
int InterprocessConnectionServer::getBoundPort() const noexcept
{
return (socket == nullptr) ? -1 : socket->getBoundPort();
}
void InterprocessConnectionServer::run()
{
while ((! threadShouldExit()) && socket != nullptr)
{
std::unique_ptr<StreamingSocket> clientSocket (socket->waitForNextConnection());
if (clientSocket != nullptr)
if (auto* newConnection = createConnectionObject())
newConnection->initialiseWithSocket (std::move (clientSocket));
}
}
} // namespace juce

View File

@ -1,106 +1,106 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
//==============================================================================
/**
An object that waits for client sockets to connect to a port on this host, and
creates InterprocessConnection objects for each one.
To use this, create a class derived from it which implements the createConnectionObject()
method, so that it creates suitable connection objects for each client that tries
to connect.
@see InterprocessConnection
@tags{Events}
*/
class JUCE_API InterprocessConnectionServer : private Thread
{
public:
//==============================================================================
/** Creates an uninitialised server object.
*/
InterprocessConnectionServer();
/** Destructor. */
~InterprocessConnectionServer() override;
//==============================================================================
/** Starts an internal thread which listens on the given port number.
While this is running, if another process tries to connect with the
InterprocessConnection::connectToSocket() method, this object will call
createConnectionObject() to create a connection to that client.
Use stop() to stop the thread running.
@param portNumber The port on which the server will receive
connections
@param bindAddress The address on which the server will listen
for connections. An empty string indicates
that it should listen on all addresses
assigned to this machine.
@see createConnectionObject, stop
*/
bool beginWaitingForSocket (int portNumber, const String& bindAddress = String());
/** Terminates the listener thread, if it's active.
@see beginWaitingForSocket
*/
void stop();
/** Returns the local port number to which this server is currently bound.
This is useful if you need to know to which port the OS has actually bound your
socket when calling beginWaitingForSocket with a port number of zero.
Returns -1 if the function fails.
*/
int getBoundPort() const noexcept;
protected:
/** Creates a suitable connection object for a client process that wants to
connect to this one.
This will be called by the listener thread when a client process tries
to connect, and must return a new InterprocessConnection object that will
then run as this end of the connection.
@see InterprocessConnection
*/
virtual InterprocessConnection* createConnectionObject() = 0;
private:
//==============================================================================
std::unique_ptr<StreamingSocket> socket;
void run() override;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnectionServer)
};
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
//==============================================================================
/**
An object that waits for client sockets to connect to a port on this host, and
creates InterprocessConnection objects for each one.
To use this, create a class derived from it which implements the createConnectionObject()
method, so that it creates suitable connection objects for each client that tries
to connect.
@see InterprocessConnection
@tags{Events}
*/
class JUCE_API InterprocessConnectionServer : private Thread
{
public:
//==============================================================================
/** Creates an uninitialised server object.
*/
InterprocessConnectionServer();
/** Destructor. */
~InterprocessConnectionServer() override;
//==============================================================================
/** Starts an internal thread which listens on the given port number.
While this is running, if another process tries to connect with the
InterprocessConnection::connectToSocket() method, this object will call
createConnectionObject() to create a connection to that client.
Use stop() to stop the thread running.
@param portNumber The port on which the server will receive
connections
@param bindAddress The address on which the server will listen
for connections. An empty string indicates
that it should listen on all addresses
assigned to this machine.
@see createConnectionObject, stop
*/
bool beginWaitingForSocket (int portNumber, const String& bindAddress = String());
/** Terminates the listener thread, if it's active.
@see beginWaitingForSocket
*/
void stop();
/** Returns the local port number to which this server is currently bound.
This is useful if you need to know to which port the OS has actually bound your
socket when calling beginWaitingForSocket with a port number of zero.
Returns -1 if the function fails.
*/
int getBoundPort() const noexcept;
protected:
/** Creates a suitable connection object for a client process that wants to
connect to this one.
This will be called by the listener thread when a client process tries
to connect, and must return a new InterprocessConnection object that will
then run as this end of the connection.
@see InterprocessConnection
*/
virtual InterprocessConnection* createConnectionObject() = 0;
private:
//==============================================================================
std::unique_ptr<StreamingSocket> socket;
void run() override;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnectionServer)
};
} // namespace juce

View File

@ -1,212 +1,212 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
#if JUCE_ANDROID
extern void acquireMulticastLock();
extern void releaseMulticastLock();
#endif
NetworkServiceDiscovery::Advertiser::Advertiser (const String& serviceTypeUID,
const String& serviceDescription,
int broadcastPortToUse, int connectionPort,
RelativeTime minTimeBetweenBroadcasts)
: Thread ("Discovery_broadcast"),
message (serviceTypeUID), broadcastPort (broadcastPortToUse),
minInterval (minTimeBetweenBroadcasts)
{
message.setAttribute ("id", Uuid().toString());
message.setAttribute ("name", serviceDescription);
message.setAttribute ("address", String());
message.setAttribute ("port", connectionPort);
startThread (2);
}
NetworkServiceDiscovery::Advertiser::~Advertiser()
{
stopThread (2000);
socket.shutdown();
}
void NetworkServiceDiscovery::Advertiser::run()
{
if (! socket.bindToPort (0))
{
jassertfalse;
return;
}
while (! threadShouldExit())
{
sendBroadcast();
wait ((int) minInterval.inMilliseconds());
}
}
void NetworkServiceDiscovery::Advertiser::sendBroadcast()
{
static IPAddress local = IPAddress::local();
for (auto& address : IPAddress::getAllAddresses())
{
if (address == local)
continue;
message.setAttribute ("address", address.toString());
auto broadcastAddress = IPAddress::getInterfaceBroadcastAddress (address);
auto data = message.toString (XmlElement::TextFormat().singleLine().withoutHeader());
socket.write (broadcastAddress.toString(), broadcastPort, data.toRawUTF8(), (int) data.getNumBytesAsUTF8());
}
}
//==============================================================================
NetworkServiceDiscovery::AvailableServiceList::AvailableServiceList (const String& serviceType, int broadcastPort)
: Thread ("Discovery_listen"), serviceTypeUID (serviceType)
{
#if JUCE_ANDROID
acquireMulticastLock();
#endif
socket.bindToPort (broadcastPort);
startThread (2);
}
NetworkServiceDiscovery::AvailableServiceList::~AvailableServiceList()
{
socket.shutdown();
stopThread (2000);
#if JUCE_ANDROID
releaseMulticastLock();
#endif
}
void NetworkServiceDiscovery::AvailableServiceList::run()
{
while (! threadShouldExit())
{
if (socket.waitUntilReady (true, 200) == 1)
{
char buffer[1024];
auto bytesRead = socket.read (buffer, sizeof (buffer) - 1, false);
if (bytesRead > 10)
if (auto xml = parseXML (String (CharPointer_UTF8 (buffer),
CharPointer_UTF8 (buffer + bytesRead))))
if (xml->hasTagName (serviceTypeUID))
handleMessage (*xml);
}
removeTimedOutServices();
}
}
std::vector<NetworkServiceDiscovery::Service> NetworkServiceDiscovery::AvailableServiceList::getServices() const
{
const ScopedLock sl (listLock);
auto listCopy = services;
return listCopy;
}
void NetworkServiceDiscovery::AvailableServiceList::handleAsyncUpdate()
{
if (onChange != nullptr)
onChange();
}
void NetworkServiceDiscovery::AvailableServiceList::handleMessage (const XmlElement& xml)
{
Service service;
service.instanceID = xml.getStringAttribute ("id");
if (service.instanceID.trim().isNotEmpty())
{
service.description = xml.getStringAttribute ("name");
service.address = IPAddress (xml.getStringAttribute ("address"));
service.port = xml.getIntAttribute ("port");
service.lastSeen = Time::getCurrentTime();
handleMessage (service);
}
}
static void sortServiceList (std::vector<NetworkServiceDiscovery::Service>& services)
{
auto compareServices = [] (const NetworkServiceDiscovery::Service& s1,
const NetworkServiceDiscovery::Service& s2)
{
return s1.instanceID < s2.instanceID;
};
std::sort (services.begin(), services.end(), compareServices);
}
void NetworkServiceDiscovery::AvailableServiceList::handleMessage (const Service& service)
{
const ScopedLock sl (listLock);
for (auto& s : services)
{
if (s.instanceID == service.instanceID)
{
if (s.description != service.description
|| s.address != service.address
|| s.port != service.port)
{
s = service;
triggerAsyncUpdate();
}
s.lastSeen = service.lastSeen;
return;
}
}
services.push_back (service);
sortServiceList (services);
triggerAsyncUpdate();
}
void NetworkServiceDiscovery::AvailableServiceList::removeTimedOutServices()
{
const double timeoutSeconds = 5.0;
auto oldestAllowedTime = Time::getCurrentTime() - RelativeTime::seconds (timeoutSeconds);
const ScopedLock sl (listLock);
auto oldEnd = std::end (services);
auto newEnd = std::remove_if (std::begin (services), oldEnd,
[=] (const Service& s) { return s.lastSeen < oldestAllowedTime; });
if (newEnd != oldEnd)
{
services.erase (newEnd, oldEnd);
triggerAsyncUpdate();
}
}
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
#if JUCE_ANDROID
extern void acquireMulticastLock();
extern void releaseMulticastLock();
#endif
NetworkServiceDiscovery::Advertiser::Advertiser (const String& serviceTypeUID,
const String& serviceDescription,
int broadcastPortToUse, int connectionPort,
RelativeTime minTimeBetweenBroadcasts)
: Thread ("Discovery_broadcast"),
message (serviceTypeUID), broadcastPort (broadcastPortToUse),
minInterval (minTimeBetweenBroadcasts)
{
message.setAttribute ("id", Uuid().toString());
message.setAttribute ("name", serviceDescription);
message.setAttribute ("address", String());
message.setAttribute ("port", connectionPort);
startThread (2);
}
NetworkServiceDiscovery::Advertiser::~Advertiser()
{
stopThread (2000);
socket.shutdown();
}
void NetworkServiceDiscovery::Advertiser::run()
{
if (! socket.bindToPort (0))
{
jassertfalse;
return;
}
while (! threadShouldExit())
{
sendBroadcast();
wait ((int) minInterval.inMilliseconds());
}
}
void NetworkServiceDiscovery::Advertiser::sendBroadcast()
{
static IPAddress local = IPAddress::local();
for (auto& address : IPAddress::getAllAddresses())
{
if (address == local)
continue;
message.setAttribute ("address", address.toString());
auto broadcastAddress = IPAddress::getInterfaceBroadcastAddress (address);
auto data = message.toString (XmlElement::TextFormat().singleLine().withoutHeader());
socket.write (broadcastAddress.toString(), broadcastPort, data.toRawUTF8(), (int) data.getNumBytesAsUTF8());
}
}
//==============================================================================
NetworkServiceDiscovery::AvailableServiceList::AvailableServiceList (const String& serviceType, int broadcastPort)
: Thread ("Discovery_listen"), serviceTypeUID (serviceType)
{
#if JUCE_ANDROID
acquireMulticastLock();
#endif
socket.bindToPort (broadcastPort);
startThread (2);
}
NetworkServiceDiscovery::AvailableServiceList::~AvailableServiceList()
{
socket.shutdown();
stopThread (2000);
#if JUCE_ANDROID
releaseMulticastLock();
#endif
}
void NetworkServiceDiscovery::AvailableServiceList::run()
{
while (! threadShouldExit())
{
if (socket.waitUntilReady (true, 200) == 1)
{
char buffer[1024];
auto bytesRead = socket.read (buffer, sizeof (buffer) - 1, false);
if (bytesRead > 10)
if (auto xml = parseXML (String (CharPointer_UTF8 (buffer),
CharPointer_UTF8 (buffer + bytesRead))))
if (xml->hasTagName (serviceTypeUID))
handleMessage (*xml);
}
removeTimedOutServices();
}
}
std::vector<NetworkServiceDiscovery::Service> NetworkServiceDiscovery::AvailableServiceList::getServices() const
{
const ScopedLock sl (listLock);
auto listCopy = services;
return listCopy;
}
void NetworkServiceDiscovery::AvailableServiceList::handleAsyncUpdate()
{
if (onChange != nullptr)
onChange();
}
void NetworkServiceDiscovery::AvailableServiceList::handleMessage (const XmlElement& xml)
{
Service service;
service.instanceID = xml.getStringAttribute ("id");
if (service.instanceID.trim().isNotEmpty())
{
service.description = xml.getStringAttribute ("name");
service.address = IPAddress (xml.getStringAttribute ("address"));
service.port = xml.getIntAttribute ("port");
service.lastSeen = Time::getCurrentTime();
handleMessage (service);
}
}
static void sortServiceList (std::vector<NetworkServiceDiscovery::Service>& services)
{
auto compareServices = [] (const NetworkServiceDiscovery::Service& s1,
const NetworkServiceDiscovery::Service& s2)
{
return s1.instanceID < s2.instanceID;
};
std::sort (services.begin(), services.end(), compareServices);
}
void NetworkServiceDiscovery::AvailableServiceList::handleMessage (const Service& service)
{
const ScopedLock sl (listLock);
for (auto& s : services)
{
if (s.instanceID == service.instanceID)
{
if (s.description != service.description
|| s.address != service.address
|| s.port != service.port)
{
s = service;
triggerAsyncUpdate();
}
s.lastSeen = service.lastSeen;
return;
}
}
services.push_back (service);
sortServiceList (services);
triggerAsyncUpdate();
}
void NetworkServiceDiscovery::AvailableServiceList::removeTimedOutServices()
{
const double timeoutSeconds = 5.0;
auto oldestAllowedTime = Time::getCurrentTime() - RelativeTime::seconds (timeoutSeconds);
const ScopedLock sl (listLock);
auto oldEnd = std::end (services);
auto newEnd = std::remove_if (std::begin (services), oldEnd,
[=] (const Service& s) { return s.lastSeen < oldestAllowedTime; });
if (newEnd != oldEnd)
{
services.erase (newEnd, oldEnd);
triggerAsyncUpdate();
}
}
} // namespace juce

View File

@ -1,138 +1,138 @@
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2020 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
//==============================================================================
/**
Contains classes that implement a simple protocol for broadcasting the availability
and location of a discoverable service on the local network, and for maintaining a
list of known services.
@tags{Events}
*/
struct NetworkServiceDiscovery
{
/** An object which runs a thread to repeatedly broadcast the existence of a
discoverable service.
To use, simply create an instance of an Advertiser and it'll broadcast until
you delete it.
@tags{Events}
*/
struct Advertiser : private Thread
{
/** Creates and starts an Advertiser thread, broadcasting with the given properties.
@param serviceTypeUID A user-supplied string to define the type of service this represents
@param serviceDescription A description string that will appear in the Service::description field for clients
@param broadcastPort The port number on which to broadcast the service discovery packets
@param connectionPort The port number that will be sent to appear in the Service::port field
@param minTimeBetweenBroadcasts The interval to wait between sending broadcast messages
*/
Advertiser (const String& serviceTypeUID,
const String& serviceDescription,
int broadcastPort,
int connectionPort,
RelativeTime minTimeBetweenBroadcasts = RelativeTime::seconds (1.5));
/** Destructor */
~Advertiser() override;
private:
XmlElement message;
const int broadcastPort;
const RelativeTime minInterval;
DatagramSocket socket { true };
void run() override;
void sendBroadcast();
};
//==============================================================================
/**
Contains information about a service that has been found on the network.
@see AvailableServiceList, Advertiser
@tags{Events}
*/
struct Service
{
String instanceID; /**< A UUID that identifies the particular instance of the Advertiser class. */
String description; /**< The service description as sent by the Advertiser */
IPAddress address; /**< The IP address of the advertiser */
int port; /**< The port number of the advertiser */
Time lastSeen; /**< The time of the last ping received from the advertiser */
};
//==============================================================================
/**
Watches the network for broadcasts from Advertiser objects, and keeps a list of
all the currently active instances.
Just create an instance of AvailableServiceList and it will start listening - you
can register a callback with its onChange member to find out when services
appear/disappear, and you can call getServices() to find out the current list.
@see Service, Advertiser
@tags{Events}
*/
struct AvailableServiceList : private Thread,
private AsyncUpdater
{
/** Creates an AvailableServiceList that will bind to the given port number and watch
the network for Advertisers broadcasting the given service type.
This will only detect broadcasts from an Advertiser object with a matching
serviceTypeUID value, and where the broadcastPort matches.
*/
AvailableServiceList (const String& serviceTypeUID, int broadcastPort);
/** Destructor */
~AvailableServiceList() override;
/** A lambda that can be set to receive a callback when the list changes */
std::function<void()> onChange;
/** Returns a list of the currently known services. */
std::vector<Service> getServices() const;
private:
DatagramSocket socket { true };
String serviceTypeUID;
CriticalSection listLock;
std::vector<Service> services;
void run() override;
void handleAsyncUpdate() override;
void handleMessage (const XmlElement&);
void handleMessage (const Service&);
void removeTimedOutServices();
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (AvailableServiceList)
};
};
} // namespace juce
/*
==============================================================================
This file is part of the JUCE library.
Copyright (c) 2022 - Raw Material Software Limited
JUCE is an open source library subject to commercial or open-source
licensing.
The code included in this file is provided under the terms of the ISC license
http://www.isc.org/downloads/software-support-policy/isc-license. Permission
To use, copy, modify, and/or distribute this software for any purpose with or
without fee is hereby granted provided that the above copyright notice and
this permission notice appear in all copies.
JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
DISCLAIMED.
==============================================================================
*/
namespace juce
{
//==============================================================================
/**
Contains classes that implement a simple protocol for broadcasting the availability
and location of a discoverable service on the local network, and for maintaining a
list of known services.
@tags{Events}
*/
struct NetworkServiceDiscovery
{
/** An object which runs a thread to repeatedly broadcast the existence of a
discoverable service.
To use, simply create an instance of an Advertiser and it'll broadcast until
you delete it.
@tags{Events}
*/
struct Advertiser : private Thread
{
/** Creates and starts an Advertiser thread, broadcasting with the given properties.
@param serviceTypeUID A user-supplied string to define the type of service this represents
@param serviceDescription A description string that will appear in the Service::description field for clients
@param broadcastPort The port number on which to broadcast the service discovery packets
@param connectionPort The port number that will be sent to appear in the Service::port field
@param minTimeBetweenBroadcasts The interval to wait between sending broadcast messages
*/
Advertiser (const String& serviceTypeUID,
const String& serviceDescription,
int broadcastPort,
int connectionPort,
RelativeTime minTimeBetweenBroadcasts = RelativeTime::seconds (1.5));
/** Destructor */
~Advertiser() override;
private:
XmlElement message;
const int broadcastPort;
const RelativeTime minInterval;
DatagramSocket socket { true };
void run() override;
void sendBroadcast();
};
//==============================================================================
/**
Contains information about a service that has been found on the network.
@see AvailableServiceList, Advertiser
@tags{Events}
*/
struct Service
{
String instanceID; /**< A UUID that identifies the particular instance of the Advertiser class. */
String description; /**< The service description as sent by the Advertiser */
IPAddress address; /**< The IP address of the advertiser */
int port; /**< The port number of the advertiser */
Time lastSeen; /**< The time of the last ping received from the advertiser */
};
//==============================================================================
/**
Watches the network for broadcasts from Advertiser objects, and keeps a list of
all the currently active instances.
Just create an instance of AvailableServiceList and it will start listening - you
can register a callback with its onChange member to find out when services
appear/disappear, and you can call getServices() to find out the current list.
@see Service, Advertiser
@tags{Events}
*/
struct AvailableServiceList : private Thread,
private AsyncUpdater
{
/** Creates an AvailableServiceList that will bind to the given port number and watch
the network for Advertisers broadcasting the given service type.
This will only detect broadcasts from an Advertiser object with a matching
serviceTypeUID value, and where the broadcastPort matches.
*/
AvailableServiceList (const String& serviceTypeUID, int broadcastPort);
/** Destructor */
~AvailableServiceList() override;
/** A lambda that can be set to receive a callback when the list changes */
std::function<void()> onChange;
/** Returns a list of the currently known services. */
std::vector<Service> getServices() const;
private:
DatagramSocket socket { true };
String serviceTypeUID;
CriticalSection listLock;
std::vector<Service> services;
void run() override;
void handleAsyncUpdate() override;
void handleMessage (const XmlElement&);
void handleMessage (const Service&);
void removeTimedOutServices();
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (AvailableServiceList)
};
};
} // namespace juce