Pstream with a single comm, updated parallel ops

This commit is contained in:
Hrvoje Jasak 2016-09-14 05:24:50 +01:00
parent aa518c6931
commit 7bab48776c
77 changed files with 3288 additions and 1354 deletions

View file

@ -1047,6 +1047,18 @@ void Foam::faMesh::addFaPatches(const List<faPatch*>& p)
}
Foam::label Foam::faMesh::comm() const
{
return comm_;
}
Foam::label& Foam::faMesh::comm()
{
return comm_;
}
const Foam::objectRegistry& Foam::faMesh::thisDb() const
{
return mesh().thisDb();

View file

@ -110,6 +110,12 @@ class faMesh
mutable label nFaces_;
// Communication support
//- Communicator used for parallel communication
label comm_;
// Demand-driven data
//- Primitive patch
@ -287,8 +293,7 @@ public:
);
// Destructor
//- Destructor
virtual ~faMesh();
@ -369,6 +374,15 @@ public:
}
// Communication support
//- Return communicator used for parallel communication
label comm() const;
//- Return communicator used for parallel communication
label& comm();
// Access
//- Return reference to the mesh database

View file

@ -55,6 +55,18 @@ processorFaPatch::~processorFaPatch()
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::label Foam::processorFaPatch::comm() const
{
return boundaryMesh().mesh().comm();
}
int Foam::processorFaPatch::tag() const
{
return Pstream::msgType();
}
void processorFaPatch::makeNonGlobalPatchPoints() const
{
// If it is not runing parallel or there are no global points

View file

@ -37,7 +37,6 @@ SourceFiles
#include "coupledFaPatch.H"
#include "processorLduInterface.H"
// #include "processorPolyPatch.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -55,7 +54,10 @@ class processorFaPatch
{
// Private data
//- My processro number
int myProcNo_;
//- Neighbour processor number
int neighbProcNo_;
//- Processor-neighbbour patch edge centres
@ -75,6 +77,7 @@ class processorFaPatch
// non-global, i.e. present in this processor patch
mutable labelList* nonGlobalPatchPointsPtr_;
protected:
// Protected Member functions
@ -88,9 +91,8 @@ protected:
//- Find non-globa patch points
void makeNonGlobalPatchPoints() const;
protected:
// Protected Member functions
// Geometry functions
//- Initialise the calculation of the patch geometry
void initGeometry();
@ -110,6 +112,7 @@ protected:
//- Update of the patch topology
virtual void updateMesh();
public:
//- Runtime type information
@ -160,8 +163,8 @@ public:
nonGlobalPatchPointsPtr_(NULL)
{}
// Destructor
//- Destructor
virtual ~processorFaPatch();
@ -192,6 +195,16 @@ public:
}
}
// Communications support
//- Return communicator used for communication
virtual label comm() const;
//- Return message tag to use for communication
virtual int tag() const;
//- Return face transformation tensor
virtual const tensorField& forwardT() const
{

View file

@ -249,7 +249,7 @@ public:
) const;
//- Processor coupled interface functions
// Processor coupled interface functions
//- Return processor number
virtual int myProcNo() const
@ -263,6 +263,16 @@ public:
return procPatch_.neighbProcNo();
}
// Communication support
//- Return communicator used for parallel communication
virtual int comm() const
{
return procPatch_.comm();
}
//- Does the patch field perform the transfromation
virtual bool doTransform() const
{

View file

@ -698,7 +698,10 @@ void Foam::fvMatrix<Type>::relax()
{
if (psi_.mesh().solutionDict().relaxEquation(psi_.name()))
{
relax(psi_.mesh().solutionDict().equationRelaxationFactor(psi_.name()));
relax
(
psi_.mesh().solutionDict().equationRelaxationFactor(psi_.name())
);
}
else
{

View file

@ -216,8 +216,7 @@ public:
);
// Destructor
//- Destructor
virtual ~fvMesh();
@ -285,6 +284,15 @@ public:
}
// Communication support
//- Return communicator used for parallel communication
label comm() const
{
return polyMesh::comm();
}
//- Return cell volumes
const DimensionedField<scalar, volMesh>& V() const;

View file

@ -90,8 +90,7 @@ public:
{}
// Destructor
//- Destructor
virtual ~cyclicGgiFvPatch()
{}

View file

@ -92,13 +92,27 @@ public:
{}
// Destructor
//- Destructor
virtual ~ggiFvPatch();
// Member functions
// Communication support
//- Return communicator used for parallel communication
virtual int comm() const
{
return ggiPolyPatch_.comm();
}
//- Return message tag used for sending
virtual int tag() const
{
return ggiPolyPatch_.tag();
}
// Access
//- Return shadow patch

View file

@ -95,13 +95,27 @@ public:
{}
// Destructor
//- Destructor
virtual ~mixingPlaneFvPatch();
// Member functions
// Communication support
//- Return communicator used for parallel communication
virtual int comm() const
{
return mixingPlanePolyPatch_.comm();
}
//- Return message tag used for sending
virtual int tag() const
{
return mixingPlanePolyPatch_.tag();
}
// Access
//- Return shadow patch

View file

@ -86,8 +86,7 @@ public:
{}
// Destructor
//- Destructor
virtual ~processorFvPatch()
{}
@ -119,6 +118,21 @@ public:
}
}
// Communication support
//- Return communicator used for parallel communication
virtual int comm() const
{
return procPolyPatch_.comm();
}
//- Return message tag used for sending
virtual int tag() const
{
return procPolyPatch_.tag();
}
//- Return face transformation tensor
virtual const tensorField& forwardT() const
{

View file

@ -95,8 +95,7 @@ public:
{}
// Destructor
//- Destructor
virtual ~regionCoupleFvPatch();
@ -123,6 +122,21 @@ public:
virtual tmp<vectorField> delta() const;
// Communication support
//- Return communicator used for parallel communication
virtual int comm() const
{
return rcPolyPatch_.comm();
}
//- Return message tag used for sending
virtual int tag() const
{
return rcPolyPatch_.tag();
}
// Interpolation
//- Interpolate face field

View file

@ -147,7 +147,6 @@ $(Pstreams)/IPstream.C
$(Pstreams)/OPstream.C
$(Pstreams)/IPread.C
$(Pstreams)/OPwrite.C
$(Pstreams)/PstreamsPrint.C
dictionary = db/dictionary
$(dictionary)/dictionary.C

View file

@ -38,6 +38,8 @@ Foam::IPstream::IPstream
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
const int tag,
const label comm,
streamFormat format,
versionNumber version
)
@ -45,6 +47,8 @@ Foam::IPstream::IPstream
Pstream(commsType, bufSize),
Istream(format, version),
fromProcNo_(fromProcNo),
tag_(tag),
comm_(comm),
messageSize_(0)
{
setOpened();
@ -52,17 +56,31 @@ Foam::IPstream::IPstream
MPI_Status status;
// If the buffer size is not specified, probe the incomming message
// If the buffer size is not specified, probe the incoming message
// and set it
if (!bufSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Probe
(
fromProcNo_,
tag_,
PstreamGlobals::MPICommunicators_[comm_],
&status
);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
buf_.setSize(messageSize_);
}
messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size());
messageSize_ = IPstream::read
(
commsType,
fromProcNo_,
buf_.begin(),
buf_.size(),
tag_,
comm_
);
if (!messageSize_)
{
@ -83,9 +101,31 @@ Foam::label Foam::IPstream::read
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag,
const label comm
)
{
if (debug)
{
Pout<< "UIPstream::read : starting read from:" << fromProcNo
<< " tag:" << tag << " comm:" << comm
<< " wanted size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< Foam::endl;
}
if (Pstream::warnComm != -1 && comm != Pstream::warnComm)
{
Pout<< "UIPstream::read : starting read from:" << fromProcNo
<< " tag:" << tag << " comm:" << comm
<< " wanted size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< " warnComm:" << Pstream::warnComm
<< Foam::endl;
error::printStack(Pout);
}
if (commsType == blocking || commsType == scheduled)
{
MPI_Status status;
@ -96,10 +136,10 @@ Foam::label Foam::IPstream::read
(
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
msgType(),
MPI_COMM_WORLD,
MPI_BYTE,
fromProcNo,
tag,
PstreamGlobals::MPICommunicators_[comm],
&status
)
)
@ -144,10 +184,10 @@ Foam::label Foam::IPstream::read
(
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
msgType(),
MPI_COMM_WORLD,
MPI_BYTE,
fromProcNo,
tag,
PstreamGlobals::MPICommunicators_[comm],
&request
)
)
@ -162,8 +202,18 @@ Foam::label Foam::IPstream::read
return 0;
}
PstreamGlobals::IPstream_outstandingRequests_.append(request);
if (debug)
{
Pout<< "UIPstream::read : started read from:" << fromProcNo
<< " tag:" << tag << " read size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< " request:" << PstreamGlobals::outstandingRequests_.size()
<< Foam::endl;
}
PstreamGlobals::outstandingRequests_.append(request);
// Assume the message is completely received.
return 1;
}
else
@ -180,56 +230,4 @@ Foam::label Foam::IPstream::read
}
void Foam::IPstream::waitRequests()
{
if (PstreamGlobals::IPstream_outstandingRequests_.size())
{
if
(
MPI_Waitall
(
PstreamGlobals::IPstream_outstandingRequests_.size(),
PstreamGlobals::IPstream_outstandingRequests_.begin(),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
"IPstream::waitRequests()"
) << "MPI_Waitall returned with error" << endl;
}
PstreamGlobals::IPstream_outstandingRequests_.clear();
}
}
bool Foam::IPstream::finishedRequest(const label i)
{
if (i >= PstreamGlobals::IPstream_outstandingRequests_.size())
{
FatalErrorIn
(
"IPstream::finishedRequest(const label)"
) << "There are "
<< PstreamGlobals::IPstream_outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::IPstream_outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
return flag != 0;
}
// ************************************************************************* //

View file

@ -1,187 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 4.0
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "IPstream.H"
#include "token.H"
#include <cctype>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
Istream& IPstream::read(token& t)
{
// Return the put back token if it exists
if (Istream::getBack(t))
{
return *this;
}
char c;
// return on error
if (!read(c))
{
t.setBad();
return *this;
}
// Set the line number of this token to the current stream line number
t.lineNumber() = lineNumber();
// Analyse input starting with this character.
switch (c)
{
// Punctuation
case token::END_STATEMENT :
case token::BEGIN_LIST :
case token::END_LIST :
case token::BEGIN_SQR :
case token::END_SQR :
case token::BEGIN_BLOCK :
case token::END_BLOCK :
case token::COLON :
case token::COMMA :
case token::ASSIGN :
case token::ADD :
case token::SUBTRACT :
case token::MULTIPLY :
case token::DIVIDE :
{
t = token::punctuationToken(c);
return *this;
}
// Word
case token::WORD :
{
word* wPtr = new word;
if (read(*wPtr))
{
if (token::compound::isCompound(*wPtr))
{
t = token::compound::New(*wPtr, *this).ptr();
delete wPtr;
}
else
{
t = wPtr;
}
}
else
{
delete wPtr;
t.setBad();
}
return *this;
}
// String
case token::STRING :
{
string* sPtr = new string;
if (read(*sPtr))
{
t = sPtr;
}
else
{
delete sPtr;
t.setBad();
}
return *this;
}
// Label
case token::LABEL :
{
label l;
if (read(l))
{
t = l;
}
else
{
t.setBad();
}
return *this;
}
// floatScalar
case token::FLOAT_SCALAR :
{
floatScalar s;
if (read(s))
{
t = s;
}
else
{
t.setBad();
}
return *this;
}
// doubleScalar
case token::DOUBLE_SCALAR :
{
doubleScalar s;
if (read(s))
{
t = s;
}
else
{
t.setBad();
}
return *this;
}
// Character (returned as a single character word) or error
default:
{
if (isalpha(c))
{
t = word(c);
return *this;
}
setBad();
t.setBad();
return *this;
}
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -324,4 +324,13 @@ Foam::Istream& Foam::IPstream::rewind()
}
void Foam::IPstream::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " using communicator " << comm_
<< " and tag " << tag_
<< Foam::endl;
}
// ************************************************************************* //

View file

@ -58,6 +58,12 @@ class IPstream
//- ID of sending processor
int fromProcNo_;
//- Message tag
const int tag_;
//- Communicator
const label comm_;
//- Message size
label messageSize_;
@ -86,12 +92,14 @@ public:
const commsTypes commsType,
const int fromProcNo,
const label bufSize = 0,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm,
streamFormat format = BINARY,
versionNumber version = currentVersion
);
// Destructor
//- Destructor
~IPstream();
@ -115,15 +123,11 @@ public:
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag = Pstream::msgType(),
const label communicator = 0
);
//- Non-blocking receives: wait until all have finished.
static void waitRequests();
//- Non-blocking receives: has request i finished?
static bool finishedRequest(const label i);
//- Return next token from stream
Istream& read(token&);

View file

@ -92,13 +92,17 @@ Foam::OPstream::OPstream
const commsTypes commsType,
const int toProcNo,
const label bufSize,
const int tag,
const label comm,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Ostream(format, version),
toProcNo_(toProcNo)
toProcNo_(toProcNo),
tag_(tag),
comm_(comm)
{
setOpened();
setGood();
@ -233,4 +237,12 @@ Foam::Ostream& Foam::OPstream::write(const char* data, std::streamsize count)
}
void Foam::OPstream::print(Ostream& os) const
{
os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << " in communicator " << comm_
<< " and tag " << tag_ << Foam::endl;
}
// ************************************************************************* //

View file

@ -58,6 +58,12 @@ class OPstream
// ID of receiving processor
int toProcNo_;
//- Message tag
const int tag_;
//- Communicator
const label comm_;
// Private member functions
@ -88,13 +94,14 @@ public:
const commsTypes commsType,
const int toProcNo,
const label bufSize = 0,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm,
streamFormat format = BINARY,
versionNumber version = currentVersion
);
// Destructor
//- Destructor
~OPstream();
@ -117,15 +124,11 @@ public:
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Non-blocking writes: wait until all have finished.
static void waitRequests();
//- Non-blocking writes: has request i finished?
static bool finishedRequest(const label i);
//- Write next token to stream
Ostream& write(const token&);

View file

@ -60,9 +60,33 @@ bool Foam::OPstream::write
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
const std::streamsize bufSize,
const int tag,
const label comm
)
{
if (debug)
{
Pout<< "OPstream::write : starting write to:" << toProcNo
<< " tag:" << tag
<< " comm:" << comm << " size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< Foam::endl;
}
if (Pstream::warnComm != -1 && comm != Pstream::warnComm)
{
Pout<< "OPstream::write : starting write to:" << toProcNo
<< " tag:" << tag
<< " comm:" << comm << " size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< " warnComm:" << Pstream::warnComm
<< Foam::endl;
error::printStack(Pout);
}
PstreamGlobals::checkCommunicator(comm, toProcNo);
bool transferFailed = true;
if (commsType == blocking)
@ -71,11 +95,19 @@ bool Foam::OPstream::write
(
const_cast<char*>(buf),
bufSize,
MPI_PACKED,
procID(toProcNo),
msgType(),
MPI_COMM_WORLD
MPI_BYTE,
toProcNo, //procID(toProcNo),
tag,
PstreamGlobals::MPICommunicators_[comm] // MPI_COMM_WORLD
);
if (debug)
{
Pout<< "OPstream::write : finished write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< Foam::endl;
}
}
else if (commsType == scheduled)
{
@ -83,11 +115,19 @@ bool Foam::OPstream::write
(
const_cast<char*>(buf),
bufSize,
MPI_PACKED,
procID(toProcNo),
msgType(),
MPI_COMM_WORLD
MPI_BYTE,
toProcNo, //procID(toProcNo),
tag,
PstreamGlobals::MPICommunicators_[comm] // MPI_COMM_WORLD
);
if (debug)
{
Pout<< "OPstream::write : finished write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< Foam::endl;
}
}
else if (commsType == nonBlocking)
{
@ -97,14 +137,23 @@ bool Foam::OPstream::write
(
const_cast<char*>(buf),
bufSize,
MPI_PACKED,
procID(toProcNo),
msgType(),
MPI_COMM_WORLD,
MPI_BYTE,
toProcNo, //procID(toProcNo),
tag,
PstreamGlobals::MPICommunicators_[comm],// MPI_COMM_WORLD,
&request
);
PstreamGlobals::OPstream_outstandingRequests_.append(request);
if (debug)
{
Pout<< "OPstream::write : started write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commsType:" << Pstream::commsTypeNames[commsType]
<< " request:" << PstreamGlobals::outstandingRequests_.size()
<< Foam::endl;
}
PstreamGlobals::outstandingRequests_.append(request);
}
else
{
@ -120,56 +169,4 @@ bool Foam::OPstream::write
}
void Foam::OPstream::waitRequests()
{
if (PstreamGlobals::OPstream_outstandingRequests_.size())
{
if
(
MPI_Waitall
(
PstreamGlobals::OPstream_outstandingRequests_.size(),
PstreamGlobals::OPstream_outstandingRequests_.begin(),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
"OPstream::waitRequests()"
) << "MPI_Waitall returned with error" << Foam::endl;
}
PstreamGlobals::OPstream_outstandingRequests_.clear();
}
}
bool Foam::OPstream::finishedRequest(const label i)
{
if (i >= PstreamGlobals::OPstream_outstandingRequests_.size())
{
FatalErrorIn
(
"OPstream::finishedRequest(const label)"
) << "There are "
<< PstreamGlobals::OPstream_outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::OPstream_outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
return flag != 0;
}
// ************************************************************************* //

File diff suppressed because it is too large Load diff

View file

@ -29,11 +29,12 @@ Description
SourceFiles
Pstream.C
PstreamsPrint.C
PstreamReduceOps.C
PstreamCommsStruct.C
gatherScatter.C
combineGatherScatter.C
gatherScatterList.C
PstreamExchange.C
\*---------------------------------------------------------------------------*/
@ -47,6 +48,8 @@ SourceFiles
#include "NamedEnum.H"
#include "dynamicLabelList.H"
#include "optimisationSwitch.H"
#include "ListOps.H"
#include "LIFOStack.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -59,7 +62,6 @@ namespace Foam
class Pstream
{
public:
//- Types of communications
@ -72,6 +74,7 @@ public:
static const NamedEnum<commsTypes, 3> commsTypeNames;
// Public classes
//- Structure for communicating between processors
@ -85,7 +88,7 @@ public:
//- procIDs of processors directly below me
labelList below_;
//- procIDs of all processors below (so not just directly below)
//- procIDs of all processors below (not just directly below)
labelList allBelow_;
//- procIDs of all processors not below. (inverse set of
@ -183,32 +186,38 @@ private:
//- Is this a parallel run?
static bool parRun_;
//- My processor index
static int myProcNo_;
//- Process IDs
static List<int> procIDs_;
//- Default message type
//- Default message type info
static const int msgType_;
//- Stack of free comms
static LIFOStack<label> freeComms_;
//- My processor index
static DynamicList<int> myProcNo_;
//- Process IDs
static DynamicList<List<int> > procIDs_;
//- List of parent communicators
static DynamicList<label> parentCommunicator_;
//- Structure for linear communications
static List<commsStruct> linearCommunication_;
static DynamicList<List<commsStruct> > linearCommunication_;
//- Structure for tree communications
static List<commsStruct> treeCommunication_;
static DynamicList<List<commsStruct> > treeCommunication_;
// Private member functions
//- Set data for parallel running
static void setParRun();
static void setParRun(const label nProcs);
//- Calculate linear communication schedule
static void calcLinearComm(const label nProcs);
static List<commsStruct> calcLinearComm(const label nProcs);
//- Calculate tree communication schedule
static void calcTreeComm(const label nProcs);
static List<commsStruct> calcTreeComm(const label nProcs);
//- Helper function for tree communication schedule determination
// Collects all processorIDs below a processor
@ -223,6 +232,19 @@ private:
// Pstream::init()
static void initCommunicationSchedule();
//- Allocate a communicator with index
static void allocatePstreamCommunicator
(
const label parentIndex,
const label index
);
//- Free a communicator
static void freePstreamCommunicator
(
const label index
);
protected:
@ -268,6 +290,15 @@ public:
);
}
//- Number of polling cycles in processor updates
static const debug::optimisationSwitch nPollProcInterfaces;
//- Default communicator (all processors)
static label worldComm;
//- Debugging: warn for use of any communicator differing from warnComm
static label warnComm;
// Constructors
@ -290,6 +321,79 @@ public:
// Member functions
//- Allocate a new communicator
static label allocateCommunicator
(
const label parent,
const labelList& subRanks,
const bool doPstream = true
);
//- Free a previously allocated communicator
static void freeCommunicator
(
const label communicator,
const bool doPstream = true
);
//- Free all communicators
static void freeCommunicators(const bool doPstream);
//- Helper class for allocating/freeing communicators
class communicator
{
//- Communicator identifier
label comm_;
//- Disallow copy and assignment
communicator(const communicator&);
void operator=(const communicator&);
public:
//- Constructo from components
communicator
(
const label parent,
const labelList& subRanks,
const bool doPstream
)
:
comm_(allocateCommunicator(parent, subRanks, doPstream))
{}
//- Destructor
~communicator()
{
freeCommunicator(comm_);
}
//- Cast to label
operator label() const
{
return comm_;
}
};
//- Return physical processor number (i.e. processor number in
// worldComm) given communicator and processor
static int baseProcNo(const label myComm, const int procID);
//- Return processor number in communicator (given physical processor
// number) (= reverse of baseProcNo)
static label procNo(const label comm, const int baseProcID);
//- Return processor number in communicator (given processor number
// and communicator)
static label procNo
(
const label myComm,
const label currentComm,
const int currentProcID
);
//- Add the valid option this type of communications library
// adds/requires on the command line
static void addValidParOptions(HashTable<string>& validParOptions);
@ -298,44 +402,78 @@ public:
// Spawns slave processes and initialises inter-communication
static bool init(int& argc, char**& argv);
// Non-blocking comms
//- Get number of outstanding requests
static label nRequests();
//- Truncate number of outstanding requests
static void resetRequests(const label sz);
//- Wait until all requests (from start onwards) have finished.
static void waitRequests(const label start = 0);
//- Wait until request i has finished.
static void waitRequest(const label i);
//- Non-blocking comms: has request i finished?
static bool finishedRequest(const label i);
static int allocateTag(const char*);
static int allocateTag(const word&);
static void freeTag(const char*, const int tag);
static void freeTag(const word&, const int tag);
//- Is this a parallel run?
static bool& parRun()
{
return parRun_;
}
//- Number of processes in parallel run
static label nProcs()
//- Number of processes in parallel run for a given communicator
static label nProcs(const label communicator = 0)
{
return procIDs_.size();
return procIDs_[communicator].size();
}
//- Am I the master process
static bool master()
{
return myProcNo_ == masterNo();
}
//- Process index of the master
//- Process index of the master for the global communicator
static int masterNo()
{
return 0;
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo()
//- Am I the master process
static bool master(const label communicator = 0)
{
return myProcNo_;
return myProcNo_[communicator] == masterNo();
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo(const label communicator = 0)
{
return myProcNo_[communicator];
}
//- Return parent communicator
static label parent(const label communicator)
{
return parentCommunicator_(communicator);
}
//- Process IDs
static const List<int>& procIDs()
static const List<int>& procIDs(const int communicator)
{
return procIDs_;
return procIDs_[communicator];
}
//- Process ID of given process index
static int procID(int procNo)
static List<int>& procID(const int procNo)
{
return procIDs_[procNo];
}
@ -347,21 +485,27 @@ public:
}
//- Process index of last slave
static int lastSlave()
static int lastSlave(const label communicator = 0)
{
return nProcs() - 1;
return nProcs(communicator) - 1;
}
//- Communication schedule for linear all-to-master (proc 0)
static const List<commsStruct>& linearCommunication()
static const List<commsStruct>& linearCommunication
(
const label communicator = 0
)
{
return linearCommunication_;
return linearCommunication_[communicator];
}
//- Communication schedule for tree all-to-master (proc 0)
static const List<commsStruct>& treeCommunication()
static const List<commsStruct>& treeCommunication
(
const label communicator = 0
)
{
return treeCommunication_;
return treeCommunication_[communicator];
}
//- Message tag of standard messages
@ -400,21 +544,40 @@ public:
(
const List<commsStruct>& comms,
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T, class BinaryOp>
static void gather(T& Value, const BinaryOp& bop);
static void gather
(
T& Value,
const BinaryOp& bop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Distribute without modification.
// Reverse of gather
template <class T>
static void scatter(const List<commsStruct>& comms, T& Value);
static void scatter
(
const List<commsStruct>& comms,
T& Value,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T>
static void scatter(T& Value);
static void scatter
(
T& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Combine variants. Inplace combine values from processors.
@ -425,24 +588,39 @@ public:
(
const List<commsStruct>& comms,
T& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T, class CombineOp>
static void combineGather(T& Value, const CombineOp& cop);
static void combineGather
(
T& Value,
const CombineOp& cop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Reverse of combineGather
template <class T>
static void combineScatter
(
const List<commsStruct>& comms,
T& Value
T& Value,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T>
static void combineScatter(T& Value);
static void combineScatter
(
T& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Combine variants working on whole List at a time.
@ -451,7 +629,9 @@ public:
(
const List<commsStruct>& comms,
List<T>& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -459,7 +639,9 @@ public:
static void listCombineGather
(
List<T>& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Reverse of combineGather
@ -467,12 +649,19 @@ public:
static void listCombineScatter
(
const List<commsStruct>& comms,
List<T>& Value
List<T>& Value,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T>
static void listCombineScatter(List<T>& Value);
static void listCombineScatter
(
List<T>& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Combine variants working on whole map at a time. Container needs to
// have iterators and find() defined.
@ -482,7 +671,9 @@ public:
(
const List<commsStruct>& comms,
Container& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -490,7 +681,9 @@ public:
static void mapCombineGather
(
Container& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Reverse of combineGather
@ -498,13 +691,19 @@ public:
static void mapCombineScatter
(
const List<commsStruct>& comms,
Container& Values
Container& Values,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class Container>
static void mapCombineScatter(Container& Values);
static void mapCombineScatter
(
Container& Values,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Gather/scatter keeping the individual processor data separate.
@ -516,24 +715,56 @@ public:
static void gatherList
(
const List<commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T>
static void gatherList(List<T>& Values);
static void gatherList
(
List<T>& Values,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Reverse of gatherList
template <class T>
static void scatterList
(
const List<commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T>
static void scatterList(List<T>& Values);
static void scatterList
(
List<T>& Values,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Exchange
//- Exchange data. Sends sendData, receives into recvData, sets
// sizes (not bytes). sizes[p0][p1] is what processor p0 has
// sent to p1. Continuous data only.
// If block=true will wait for all transfers to finish.
template<class Container, class T>
static void exchange
(
const List<Container>&,
List<Container>&,
labelListList& sizes,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm,
const bool block = true
);
};
@ -556,6 +787,7 @@ Ostream& operator<<(Ostream&, const Pstream::commsStruct&);
# include "gatherScatter.C"
# include "combineGatherScatter.C"
# include "gatherScatterList.C"
# include "PstreamExchange.C"
#endif

View file

@ -50,26 +50,62 @@ void combineReduce
(
const List<Pstream::commsStruct>& comms,
T& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
)
{
Pstream::combineGather(comms, Value, cop);
Pstream::combineScatter(comms, Value);
Pstream::combineGather(comms, Value, cop, tag, comm);
Pstream::combineScatter(comms, Value, tag, comm);
}
template <class T, class CombineOp>
void combineReduce(T& Value, const CombineOp& cop)
void combineReduce
(
T& Value,
const CombineOp& cop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
{
Pstream::combineGather(Pstream::linearCommunication(), Value, cop);
Pstream::combineScatter(Pstream::linearCommunication(), Value);
Pstream::combineGather
(
Pstream::linearCommunication(comm),
Value,
cop,
tag,
comm
);
Pstream::combineScatter
(
Pstream::linearCommunication(comm),
Value,
tag,
comm
);
}
else
{
Pstream::combineGather(Pstream::treeCommunication(), Value, cop);
Pstream::combineScatter(Pstream::treeCommunication(), Value);
Pstream::combineGather
(
Pstream::treeCommunication(comm),
Value,
cop,
tag,
comm
);
Pstream::combineScatter
(
Pstream::treeCommunication(comm),
Value,
tag,
comm
);
}
}

View file

@ -0,0 +1,159 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 4.0
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Pstream exchange data.
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "contiguous.H"
#include "PstreamCombineReduceOps.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
//template<template<class> class ListType, class T>
template<class Container, class T>
void Pstream::exchange
(
const List<Container>& sendBufs,
List<Container>& recvBufs,
labelListList& sizes,
const int tag,
const label comm,
const bool block
)
{
if (!contiguous<T>())
{
FatalErrorIn
(
"Pstream::exchange(..)"
) << "Continuous data only." << Foam::abort(FatalError);
}
if (sendBufs.size() != Pstream::nProcs(comm))
{
FatalErrorIn
(
"Pstream::exchange(..)"
) << "Size of list:" << sendBufs.size()
<< " does not equal the number of processors:"
<< Pstream::nProcs(comm)
<< Foam::abort(FatalError);
}
sizes.setSize(Pstream::nProcs(comm));
labelList& nsTransPs = sizes[Pstream::myProcNo(comm)];
nsTransPs.setSize(Pstream::nProcs(comm));
forAll (sendBufs, procI)
{
nsTransPs[procI] = sendBufs[procI].size();
}
// Send sizes across. Note: blocks.
combineReduce(sizes, Pstream::listEq(), tag, comm);
if (Pstream::nProcs(comm) > 1)
{
label startOfRequests = Pstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
recvBufs.setSize(sendBufs.size());
forAll (sizes, procI)
{
label nRecv = sizes[procI][Pstream::myProcNo(comm)];
if (procI != Pstream::myProcNo(comm) && nRecv > 0)
{
recvBufs[procI].setSize(nRecv);
IPstream::read
(
Pstream::nonBlocking,
procI,
reinterpret_cast<char*>(recvBufs[procI].begin()),
nRecv*sizeof(T),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
forAll (sendBufs, procI)
{
if (procI != Pstream::myProcNo(comm) && sendBufs[procI].size() > 0)
{
if
(
!OPstream::write
(
Pstream::nonBlocking,
procI,
reinterpret_cast<const char*>(sendBufs[procI].begin()),
sendBufs[procI].size()*sizeof(T),
tag,
comm
)
)
{
FatalErrorIn("Pstream::exchange(..)")
<< "Cannot send outgoing message. "
<< "to:" << procI << " nBytes:"
<< label(sendBufs[procI].size()*sizeof(T))
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (block)
{
Pstream::waitRequests(startOfRequests);
}
}
// Do myself
recvBufs[Pstream::myProcNo(comm)] = sendBufs[Pstream::myProcNo(comm)];
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View file

@ -33,10 +33,50 @@ namespace Foam
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Outstanding non-blocking operations.
//! @cond fileScope
DynamicList<MPI_Request> PstreamGlobals::IPstream_outstandingRequests_;
DynamicList<MPI_Request> PstreamGlobals::OPstream_outstandingRequests_;
//! @endcond
//! \cond fileScope
DynamicList<MPI_Request> PstreamGlobals::outstandingRequests_;
//! \endcond
// Max outstanding message tag operations.
//! \cond fileScope
int PstreamGlobals::nTags_ = 0;
//! \endcond
// Free'd message tags
//! \cond fileScope
DynamicList<int> PstreamGlobals::freedTags_;
//! \endcond
// Allocated communicators.
//! \cond fileScope
DynamicList<MPI_Comm> PstreamGlobals::MPICommunicators_;
DynamicList<MPI_Group> PstreamGlobals::MPIGroups_;
//! \endcond
void PstreamGlobals::checkCommunicator
(
const label comm,
const label otherProcNo
)
{
if
(
comm < 0
|| comm >= PstreamGlobals::MPICommunicators_.size()
)
{
FatalErrorIn
(
"PstreamGlobals::checkCommunicator(const label, const label)"
) << "otherProcNo:" << otherProcNo << " : illegal communicator "
<< comm << endl
<< "Communicator should be within range 0.."
<< PstreamGlobals::MPICommunicators_.size() - 1
<< abort(FatalError);
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View file

@ -52,8 +52,18 @@ namespace Foam
namespace PstreamGlobals
{
extern DynamicList<MPI_Request> IPstream_outstandingRequests_;
extern DynamicList<MPI_Request> OPstream_outstandingRequests_;
extern DynamicList<MPI_Request> outstandingRequests_;
extern int nTags_;
extern DynamicList<int> freedTags_;
// Current communicators. First element will be MPI_COMM_WORLD
extern DynamicList<MPI_Comm> MPICommunicators_;
extern DynamicList<MPI_Group> MPIGroups_;
void checkCommunicator(const label, const label procNo);
};

View file

@ -21,28 +21,11 @@ License
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Prints out a description of the streams
\*---------------------------------------------------------------------------*/
#include "IPstream.H"
#include "OPstream.H"
#include "PstreamReduceOps.H"
#include "allReduce.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Foam::IPstream::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " to processor " << myProcNo() << Foam::endl;
}
void Foam::OPstream::print(Ostream& os) const
{
os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << Foam::endl;
}
// ************************************************************************* //

View file

@ -28,6 +28,7 @@ License
#include "Pstream.H"
#include "ops.H"
#include "vector2D.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -42,11 +43,20 @@ void reduce
(
const List<Pstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag,
const label comm
)
{
Pstream::gather(comms, Value, bop);
Pstream::scatter(comms, Value);
if (Pstream::warnComm != -1 && comm != Pstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << comm
<< endl;
error::printStack(Pout);
}
Pstream::gather(comms, Value, bop, tag, comm);
Pstream::scatter(comms, Value, tag, comm);
}
@ -55,16 +65,18 @@ template <class T, class BinaryOp>
void reduce
(
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
reduce(Pstream::linearCommunication(), Value, bop);
reduce(Pstream::linearCommunication(comm), Value, bop, tag, comm);
}
else
{
reduce(Pstream::treeCommunication(), Value, bop);
reduce(Pstream::treeCommunication(comm), Value, bop, tag, comm);
}
}
@ -74,26 +86,100 @@ template <class T, class BinaryOp>
T returnReduce
(
const T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
)
{
T WorkValue(Value);
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
reduce(Pstream::linearCommunication(), WorkValue, bop);
reduce(Pstream::linearCommunication(comm), WorkValue, bop, tag, comm);
}
else
{
reduce(Pstream::treeCommunication(), WorkValue, bop);
reduce(Pstream::treeCommunication(comm), WorkValue, bop, tag, comm);
}
return WorkValue;
}
// Insist there is a specialisation for the reduction of a scalar
void reduce(scalar& Value, const sumOp<scalar>& bop);
// Reduce with sum of both value and count (for averaging)
template<class T>
void sumReduce
(
T& Value,
label& Count,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
)
{
reduce(Value, sumOp<T>(), tag, comm);
reduce(Count, sumOp<label>(), tag, comm);
}
// Non-blocking version of reduce. Sets request.
template<class T, class BinaryOp>
void reduce
(
T& Value,
const BinaryOp& bop,
const int tag,
const label comm,
label& request
)
{
notImplemented
(
"reduce(T&, const BinaryOp&, const int, const label, label&"
);
}
// Insist there are specialisations for the common reductions of scalar(s)
void reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
void reduce
(
scalar& Value,
const minOp<scalar>& bop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
void reduce
(
vector2D& Value,
const sumOp<vector2D>& bop,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
void sumReduce
(
scalar& Value,
label& Count,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
void reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label comm,
label& request
);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View file

@ -0,0 +1,71 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 4.0
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
Description
Various functions to wrap MPI_Allreduce
SourceFiles
allReduceTemplates.C
\*---------------------------------------------------------------------------*/
#ifndef allReduce_H
#define allReduce_H
#include "mpi.h"
#include "Pstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
template<class Type, class BinaryOp>
void allReduce
(
Type& Value,
int count,
MPI_Datatype MPIType,
MPI_Op op,
const BinaryOp& bop,
const int tag,
const int communicator
);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#ifdef NoRepository
# include "allReduceTemplates.C"
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View file

@ -0,0 +1,211 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | foam-extend: Open Source CFD
\\ / O peration | Version: 4.0
\\ / A nd | Web: http://www.foam-extend.org
\\/ M anipulation | For copyright notice see file Copyright
-------------------------------------------------------------------------------
License
This file is part of foam-extend.
foam-extend is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
foam-extend is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "allReduce.H"
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
template<class Type, class BinaryOp>
void Foam::allReduce
(
Type& Value,
int MPICount,
MPI_Datatype MPIType,
MPI_Op MPIOp,
const BinaryOp& bop,
const int tag,
const label comm
)
{
if (!Pstream::parRun())
{
return;
}
if (Pstream::nProcs(comm) <= Pstream::nProcsSimpleSum)
{
if (Pstream::master(comm))
{
for
(
int slave = Pstream::firstSlave();
slave <= Pstream::lastSlave(comm);
slave++
)
{
Type value;
if
(
MPI_Recv
(
&value,
MPICount,
MPIType,
slave, //Pstream::procID(slave),
tag,
PstreamGlobals::MPICommunicators_[comm],
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"void Foam::allReduce\n"
"(\n"
" Type&,\n"
" int,\n"
" MPI_Datatype,\n"
" MPI_Op,\n"
" const BinaryOp&,\n"
" const int\n"
")\n"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
MPI_Send
(
&Value,
MPICount,
MPIType,
Pstream::masterNo(),//Pstream::procID(masterNo()),
tag,
PstreamGlobals::MPICommunicators_[comm]
)
)
{
FatalErrorIn
(
"void Foam::allReduce\n"
"(\n"
" Type&,\n"
" int,\n"
" MPI_Datatype,\n"
" MPI_Op,\n"
" const BinaryOp&,\n"
" const int\n"
")\n"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::master(comm))
{
for
(
int slave = Pstream::firstSlave();
slave <= Pstream::lastSlave(comm);
slave++
)
{
if
(
MPI_Send
(
&Value,
MPICount,
MPIType,
slave, //Pstream::procID(slave),
tag,
PstreamGlobals::MPICommunicators_[comm]
)
)
{
FatalErrorIn
(
"void Foam::allReduce\n"
"(\n"
" Type&,\n"
" int,\n"
" MPI_Datatype,\n"
" MPI_Op,\n"
" const BinaryOp&,\n"
" const int\n"
")\n"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
MPI_Recv
(
&Value,
MPICount,
MPIType,
Pstream::masterNo(),//Pstream::procID(masterNo()),
tag,
PstreamGlobals::MPICommunicators_[comm],
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"void Foam::allReduce\n"
"(\n"
" Type&,\n"
" int,\n"
" MPI_Datatype,\n"
" MPI_Op,\n"
" const BinaryOp&,\n"
" const int\n"
")\n"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
}
else
{
Type sum;
MPI_Allreduce
(
&Value,
&sum,
MPICount,
MPIType,
MPIOp,
PstreamGlobals::MPICommunicators_[comm]
);
Value = sum;
}
}
// ************************************************************************* //

View file

@ -50,13 +50,15 @@ void Pstream::combineGather
(
const List<Pstream::commsStruct>& comms,
T& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll (myComm.below(), belowI)
@ -71,7 +73,9 @@ void Pstream::combineGather
Pstream::scheduled,
belowID,
reinterpret_cast<char*>(&value),
sizeof(T)
sizeof(T),
tag,
comm
);
if (debug > 1)
@ -84,7 +88,7 @@ void Pstream::combineGather
}
else
{
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(Pstream::scheduled, belowID, 0, tag, comm);
T value(fromBelow);
if (debug > 1)
@ -113,12 +117,21 @@ void Pstream::combineGather
Pstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Value;
}
}
@ -127,26 +140,52 @@ void Pstream::combineGather
template <class T, class CombineOp>
void Pstream::combineGather(T& Value, const CombineOp& cop)
void Pstream::combineGather
(
T& Value,
const CombineOp& cop,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
combineGather(Pstream::linearCommunication(), Value, cop);
combineGather
(
Pstream::linearCommunication(comm),
Value,
cop,
tag,
comm
);
}
else
{
combineGather(Pstream::treeCommunication(), Value, cop);
combineGather
(
Pstream::treeCommunication(comm),
Value,
cop,
tag,
comm
);
}
}
template <class T>
void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
void Pstream::combineScatter
(
const List<Pstream::commsStruct>& comms,
T& Value,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
@ -158,12 +197,21 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
Pstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
Value = T(fromAbove);
}
@ -191,12 +239,14 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
Pstream::scheduled,
belowID,
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(Pstream::scheduled, belowID, 0, tag, comm);
toBelow << Value;
}
}
@ -205,15 +255,20 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
template <class T>
void Pstream::combineScatter(T& Value)
void Pstream::combineScatter
(
T& Value,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
combineScatter(Pstream::linearCommunication(), Value);
combineScatter(Pstream::linearCommunication(comm), Value, tag, comm);
}
else
{
combineScatter(Pstream::treeCommunication(), Value);
combineScatter(Pstream::treeCommunication(comm), Value, tag, comm);
}
}
@ -227,13 +282,15 @@ void Pstream::listCombineGather
(
const List<Pstream::commsStruct>& comms,
List<T>& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll (myComm.below(), belowI)
@ -249,7 +306,9 @@ void Pstream::listCombineGather
Pstream::scheduled,
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
receivedValues.byteSize(),
tag,
comm
);
if (debug > 1)
@ -265,7 +324,7 @@ void Pstream::listCombineGather
}
else
{
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(Pstream::scheduled, belowID, 0, tag, comm);
List<T> receivedValues(fromBelow);
if (debug > 1)
@ -297,12 +356,21 @@ void Pstream::listCombineGather
Pstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize()
Values.byteSize(),
tag,
comm
);
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Values;
}
}
@ -311,15 +379,35 @@ void Pstream::listCombineGather
template <class T, class CombineOp>
void Pstream::listCombineGather(List<T>& Values, const CombineOp& cop)
void Pstream::listCombineGather
(
List<T>& Values,
const CombineOp& cop,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
listCombineGather(Pstream::linearCommunication(), Values, cop);
listCombineGather
(
Pstream::linearCommunication(comm),
Values,
cop,
tag,
comm
);
}
else
{
listCombineGather(Pstream::treeCommunication(), Values, cop);
listCombineGather
(
Pstream::treeCommunication(comm),
Values,
cop,
tag,
comm
);
}
}
@ -328,13 +416,15 @@ template <class T>
void Pstream::listCombineScatter
(
const List<Pstream::commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
@ -346,12 +436,21 @@ void Pstream::listCombineScatter
Pstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(Values.begin()),
Values.byteSize()
Values.byteSize(),
tag,
comm
);
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> Values;
}
@ -379,12 +478,14 @@ void Pstream::listCombineScatter
Pstream::scheduled,
belowID,
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize()
Values.byteSize(),
tag,
comm
);
}
else
{
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(Pstream::scheduled, belowID, 0, tag, comm);
toBelow << Values;
}
}
@ -393,44 +494,60 @@ void Pstream::listCombineScatter
template <class T>
void Pstream::listCombineScatter(List<T>& Values)
void Pstream::listCombineScatter
(
List<T>& Values,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
listCombineScatter(Pstream::linearCommunication(), Values);
listCombineScatter
(
Pstream::linearCommunication(),
Values,
tag,
comm
);
}
else
{
listCombineScatter(Pstream::treeCommunication(), Values);
listCombineScatter
(
Pstream::treeCommunication(),
Values,
tag,
comm
);
}
}
// Same thing but for sparse list (map)
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
template <class Container, class CombineOp>
void Pstream::mapCombineGather
(
const List<Pstream::commsStruct>& comms,
Container& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll (myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(Pstream::scheduled, belowID, 0, tag, comm);
Container receivedValues(fromBelow);
if (debug > 1)
@ -470,7 +587,7 @@ void Pstream::mapCombineGather
<< " data:" << Values << endl;
}
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove(Pstream::scheduled, myComm.above(), 0, tag, comm);
toAbove << Values;
}
}
@ -478,15 +595,35 @@ void Pstream::mapCombineGather
template <class Container, class CombineOp>
void Pstream::mapCombineGather(Container& Values, const CombineOp& cop)
void Pstream::mapCombineGather
(
Container& Values,
const CombineOp& cop,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
mapCombineGather(Pstream::linearCommunication(), Values, cop);
mapCombineGather
(
Pstream::linearCommunication(),
Values,
cop,
tag,
comm
);
}
else
{
mapCombineGather(Pstream::treeCommunication(), Values, cop);
mapCombineGather
(
Pstream::treeCommunication(),
Values,
cop,
tag,
comm
);
}
}
@ -495,18 +632,27 @@ template <class Container>
void Pstream::mapCombineScatter
(
const List<Pstream::commsStruct>& comms,
Container& Values
Container& Values,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> Values;
if (debug > 1)
@ -526,7 +672,7 @@ void Pstream::mapCombineScatter
Pout<< " sending to " << belowID << " data:" << Values << endl;
}
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(Pstream::scheduled, belowID, 0, tag, comm);
toBelow << Values;
}
}
@ -534,15 +680,32 @@ void Pstream::mapCombineScatter
template <class Container>
void Pstream::mapCombineScatter(Container& Values)
void Pstream::mapCombineScatter
(
Container& Values,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
mapCombineScatter(Pstream::linearCommunication(), Values);
mapCombineScatter
(
Pstream::linearCommunication(),
Values,
tag,
comm
);
}
else
{
mapCombineScatter(Pstream::treeCommunication(), Values);
mapCombineScatter
(
Pstream::treeCommunication(),
Values,
tag,
comm
);
}
}

View file

@ -45,13 +45,15 @@ void Pstream::gather
(
const List<Pstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll (myComm.below(), belowI)
@ -65,12 +67,21 @@ void Pstream::gather
Pstream::scheduled,
myComm.below()[belowI],
reinterpret_cast<char*>(&value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
IPstream fromBelow(Pstream::scheduled, myComm.below()[belowI]);
IPstream fromBelow
(
Pstream::scheduled,
myComm.below()[belowI],
0,
tag,
comm
);
fromBelow >> value;
}
@ -87,12 +98,21 @@ void Pstream::gather
Pstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Value;
}
}
@ -101,26 +121,38 @@ void Pstream::gather
template <class T, class BinaryOp>
void Pstream::gather(T& Value, const BinaryOp& bop)
void Pstream::gather
(
T& Value,
const BinaryOp& bop,
const int tag,
const label comm
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
gather(Pstream::linearCommunication(), Value, bop);
gather(Pstream::linearCommunication(comm), Value, bop, tag, comm);
}
else
{
gather(Pstream::treeCommunication(), Value, bop);
gather(Pstream::treeCommunication(comm), Value, bop, tag, comm);
}
}
template <class T>
void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
void Pstream::scatter
(
const List<Pstream::commsStruct>& comms,
T& Value,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
@ -132,12 +164,21 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
Pstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> Value;
}
}
@ -152,12 +193,21 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
Pstream::scheduled,
myComm.below()[belowI],
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag,
comm
);
}
else
{
OPstream toBelow(Pstream::scheduled,myComm.below()[belowI]);
OPstream toBelow
(
Pstream::scheduled,
myComm.below()[belowI],
0,
tag,
comm
);
toBelow << Value;
}
}
@ -166,15 +216,15 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
template <class T>
void Pstream::scatter(T& Value)
void Pstream::scatter(T& Value, const int tag, const label comm)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
scatter(Pstream::linearCommunication(), Value);
scatter(Pstream::linearCommunication(comm), Value, tag, comm);
}
else
{
scatter(Pstream::treeCommunication(), Value);
scatter(Pstream::treeCommunication(comm), Value, tag, comm);
}
}

View file

@ -49,25 +49,32 @@ template <class T>
void Pstream::gatherList
(
const List<Pstream::commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
if (Values.size() != Pstream::nProcs())
if (Values.size() != Pstream::nProcs(comm))
{
FatalErrorIn
(
"Pstream::gatherList(const List<Pstream::commsStruct>&"
", List<T>)"
"void Pstream::gatherList\n"
"(\n"
" const List<Pstream::commsStruct>& comms,\n"
" List<T>& Values,\n"
" const int tag,\n"
" const label comm\n"
")"
) << "Size of list:" << Values.size()
<< " does not equal the number of processors:"
<< Pstream::nProcs()
<< Pstream::nProcs(comm)
<< Foam::abort(FatalError);
}
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll (myComm.below(), belowI)
@ -84,7 +91,9 @@ void Pstream::gatherList
Pstream::scheduled,
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
receivedValues.byteSize(),
tag,
comm
);
Values[belowID] = receivedValues[0];
@ -96,7 +105,7 @@ void Pstream::gatherList
}
else
{
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(Pstream::scheduled, belowID, 0, tag, comm);
fromBelow >> Values[belowID];
if (debug > 1)
@ -132,14 +141,14 @@ void Pstream::gatherList
if (debug > 1)
{
Pout<< " sending to " << myComm.above()
<< " data from: " << Pstream::myProcNo()
<< " data: " << Values[Pstream::myProcNo()] << endl;
<< " data from: " << Pstream::myProcNo(comm)
<< " data: " << Values[Pstream::myProcNo(comm)] << endl;
}
if (contiguous<T>())
{
List<T> sendingValues(belowLeaves.size() + 1);
sendingValues[0] = Values[Pstream::myProcNo()];
sendingValues[0] = Values[Pstream::myProcNo(comm)];
forAll (belowLeaves, leafI)
{
@ -151,13 +160,22 @@ void Pstream::gatherList
Pstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize()
sendingValues.byteSize(),
tag,
comm
);
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
toAbove << Values[Pstream::myProcNo()];
OPstream toAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Values[Pstream::myProcNo(comm)];
forAll (belowLeaves, leafI)
{
@ -178,15 +196,15 @@ void Pstream::gatherList
template <class T>
void Pstream::gatherList(List<T>& Values)
void Pstream::gatherList(List<T>& Values, const int tag, const label comm)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
gatherList(Pstream::linearCommunication(), Values);
gatherList(Pstream::linearCommunication(comm), Values, tag, comm);
}
else
{
gatherList(Pstream::treeCommunication(), Values);
gatherList(Pstream::treeCommunication(comm), Values, tag, comm);
}
}
@ -195,17 +213,24 @@ template <class T>
void Pstream::scatterList
(
const List<Pstream::commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag,
const label comm
)
{
if (Pstream::parRun())
if (Pstream::nProcs(comm) > 1)
{
if (Values.size() != Pstream::nProcs())
if (Values.size() != Pstream::nProcs(comm))
{
FatalErrorIn
(
"Pstream::scatterList(const List<Pstream::commsStruct>&"
", List<T>)"
"void Pstream::scatterList\n"
"(\n"
" const List<Pstream::commsStruct>& comms,\n"
" List<T>& Values,\n"
" const int tag,\n"
" const label comm\n"
")"
) << "Size of list:" << Values.size()
<< " does not equal the number of processors:"
<< Pstream::nProcs()
@ -213,7 +238,7 @@ void Pstream::scatterList
}
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[Pstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
@ -229,7 +254,9 @@ void Pstream::scatterList
Pstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
receivedValues.byteSize(),
tag,
comm
);
forAll (notBelowLeaves, leafI)
@ -239,7 +266,14 @@ void Pstream::scatterList
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove
(
Pstream::scheduled,
myComm.above(),
0,
tag,
comm
);
forAll (notBelowLeaves, leafI)
{
@ -276,12 +310,14 @@ void Pstream::scatterList
Pstream::scheduled,
belowID,
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize()
sendingValues.byteSize(),
tag,
comm
);
}
else
{
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(Pstream::scheduled, belowID, 0, tag, comm);
// Send data destined for all other processors below belowID
forAll (notBelowLeaves, leafI)
@ -303,15 +339,15 @@ void Pstream::scatterList
template <class T>
void Pstream::scatterList(List<T>& Values)
void Pstream::scatterList(List<T>& Values, const int tag, const label comm)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum())
if (Pstream::nProcs(comm) < Pstream::nProcsSimpleSum())
{
scatterList(Pstream::linearCommunication(), Values);
scatterList(Pstream::linearCommunication(comm), Values, tag, comm);
}
else
{
scatterList(Pstream::treeCommunication(), Values);
scatterList(Pstream::treeCommunication(comm), Values, tag, comm);
}
}

View file

@ -28,7 +28,6 @@ License
#include "objectRegistry.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::Istream& Foam::regIOobject::readStream()

View file

@ -892,6 +892,7 @@ Foam::BlockMatrixAgglomeration<Type>::restrictMatrix() const
nCoarseEqns_,
coarseOwner,
coarseNeighbour,
Pstream::worldComm, //HJ, AMG Comm fineMesh.comm(),
true
)
);
@ -953,7 +954,8 @@ Foam::BlockMatrixAgglomeration<Type>::restrictMatrix() const
coarseInterfaces,
fineInterface,
fineInterface.interfaceInternalField(agglomIndex_),
fineInterfaceAddr[intI]
fineInterfaceAddr[intI],
Pstream::worldComm //HJ, AMG Comm fineMesh.comm(),
).ptr()
);
}

View file

@ -132,6 +132,15 @@ public:
virtual void expandAddrToZone(labelField&) const = 0;
// Communications support
//- Return communicator used for parallel communication
virtual int comm() const = 0;
//- Return message tag used for sending
virtual int tag() const = 0;
// Transfer buffer access
//- Return contents of the label transfer buffer

View file

@ -110,6 +110,15 @@ public:
virtual const tensorField& reverseT() const = 0;
// Communications support
//- Return communicator used for parallel communication
virtual int comm() const = 0;
//- Return message tag used for sending
virtual int tag() const = 0;
// Transfer buffer access
//- Return contents of the label transfer buffer

View file

@ -76,8 +76,7 @@ public:
processorLduInterface();
// Destructor
//- Destructor
virtual ~processorLduInterface();
@ -95,6 +94,15 @@ public:
virtual const tensorField& forwardT() const = 0;
// Communications support
//- Return communicator used for parallel communication
virtual int comm() const = 0;
//- Return message tag used for sending
virtual int tag() const = 0;
// Transfer functions
//- Raw send function

View file

@ -36,6 +36,8 @@ void Foam::processorLduInterface::send
const UList<Type>& f
) const
{
label nBytes = f.byteSize();
if (commsType == Pstream::blocking || commsType == Pstream::scheduled)
{
OPstream::write
@ -43,7 +45,9 @@ void Foam::processorLduInterface::send
commsType,
neighbProcNo(),
reinterpret_cast<const char*>(f.begin()),
f.byteSize()
nBytes,
tag(),
comm()
);
}
else if (commsType == Pstream::nonBlocking)
@ -55,18 +59,22 @@ void Foam::processorLduInterface::send
commsType,
neighbProcNo(),
receiveBuf_.begin(),
receiveBuf_.size()
receiveBuf_.size(),
tag(),
comm()
);
resizeBuf(sendBuf_, f.byteSize());
memcpy(sendBuf_.begin(), f.begin(), f.byteSize());
resizeBuf(sendBuf_, nBytes);
memcpy(sendBuf_.begin(), f.begin(), nBytes);
OPstream::write
(
commsType,
neighbProcNo(),
sendBuf_.begin(),
f.byteSize()
nBytes,
tag(),
comm()
);
}
else
@ -92,7 +100,9 @@ void Foam::processorLduInterface::receive
commsType,
neighbProcNo(),
reinterpret_cast<char*>(f.begin()),
f.byteSize()
f.byteSize(),
tag(),
comm()
);
}
else if (commsType == Pstream::nonBlocking)
@ -155,7 +165,9 @@ void Foam::processorLduInterface::compressedSend
commsType,
neighbProcNo(),
sendBuf_.begin(),
nBytes
nBytes,
tag(),
comm()
);
}
else if (commsType == Pstream::nonBlocking)
@ -167,7 +179,9 @@ void Foam::processorLduInterface::compressedSend
commsType,
neighbProcNo(),
receiveBuf_.begin(),
receiveBuf_.size()
receiveBuf_.size(),
tag(),
comm()
);
OPstream::write
@ -175,7 +189,9 @@ void Foam::processorLduInterface::compressedSend
commsType,
neighbProcNo(),
sendBuf_.begin(),
nBytes
nBytes,
tag(),
comm()
);
}
else
@ -215,7 +231,9 @@ void Foam::processorLduInterface::compressedReceive
commsType,
neighbProcNo(),
receiveBuf_.begin(),
nBytes
nBytes,
tag(),
comm()
);
}
else if (commsType != Pstream::nonBlocking)

View file

@ -61,7 +61,6 @@ public:
//- Destructor
virtual ~regionCoupleLduInterface();
};

View file

@ -264,6 +264,7 @@ void Foam::GAMGAgglomeration::agglomerateLduAddressing
nCoarseCells,
coarseOwner,
coarseNeighbour,
fineMesh.comm(),
true
)
);
@ -298,7 +299,8 @@ void Foam::GAMGAgglomeration::agglomerateLduAddressing
(
restrictMap
),
fineInterfaceAddr[inti]
fineInterfaceAddr[inti],
fineMesh.comm() // Set up comm per level?
).ptr()
);

View file

@ -132,14 +132,16 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
),
(
lduMesh,
coarseInterfaces,
fineInterface,
localRestrictAddressing,
neighbourRestrictAddressing
neighbourRestrictAddressing,
coarseComm
)
);
@ -154,7 +156,8 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);
@ -162,7 +165,10 @@ public:
//- Construct from fine-level interface,
// local and neighbour restrict addressing
AMGInterface(const lduPrimitiveMesh& lduMesh)
AMGInterface
(
const lduPrimitiveMesh& lduMesh
)
:
lduMesh_(lduMesh)
{}

View file

@ -35,7 +35,8 @@ Foam::autoPtr<Foam::AMGInterface> Foam::AMGInterface::New
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
)
{
word coupleType(fineInterface.type());
@ -53,7 +54,8 @@ Foam::autoPtr<Foam::AMGInterface> Foam::AMGInterface::New
" const lduInterfacePtrsList& coarseInterfaces,\n"
" const lduInterface& fineInterface,\n"
" const labelField& localRestrictAddressing,\n"
" const labelField& neighbourRestrictAddressing\n"
" const labelField& neighbourRestrictAddressing,\n"
" const label coarseComm\n"
")"
) << "Unknown AMGInterface type " << coupleType << ".\n"
<< "Valid AMGInterface types are :"
@ -69,7 +71,8 @@ Foam::autoPtr<Foam::AMGInterface> Foam::AMGInterface::New
coarseInterfaces,
fineInterface,
localRestrictAddressing,
neighbourRestrictAddressing
neighbourRestrictAddressing,
coarseComm
)
);
}

View file

@ -48,7 +48,8 @@ Foam::cyclicAMGInterface::cyclicAMGInterface
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm // Not needed
)
:
AMGInterface(lduMesh),

View file

@ -84,12 +84,12 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);
// Destructor
//- Destructor
virtual ~cyclicAMGInterface();

View file

@ -48,7 +48,8 @@ Foam::cyclicGGIAMGInterface::cyclicGGIAMGInterface
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
)
:
ggiAMGInterface
@ -57,7 +58,8 @@ Foam::cyclicGGIAMGInterface::cyclicGGIAMGInterface
coarseInterfaces,
fineInterface,
localRestrictAddressing,
neighbourRestrictAddressing
neighbourRestrictAddressing,
coarseComm
)
{}

View file

@ -70,7 +70,8 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);

View file

@ -195,7 +195,8 @@ Foam::ggiAMGInterface::ggiAMGInterface
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
)
:
AMGInterface(lduMesh),
@ -203,6 +204,8 @@ Foam::ggiAMGInterface::ggiAMGInterface
zoneSize_(0),
zoneAddressing_(),
procMasterFaces_(),
comm_(coarseComm),
tag_(refCast<const ggiLduInterface>(fineInterface).tag()),
mapPtr_(NULL)
{
// New algorithm will assemble local clusters on the master side and

View file

@ -74,8 +74,15 @@ class ggiAMGInterface
// to allow the slave to insert faces in the same order
labelListList procMasterFaces_;
// Parallel communication
//- Communicator to use for parallel communication
const label comm_;
//- Message tag used for sending
const int tag_;
//- Map-distribute comms tool
mutable mapDistribute* mapPtr_;
@ -118,7 +125,8 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);
@ -137,6 +145,21 @@ public:
}
// Communications support
//- Return communicator used for parallel communication
virtual int comm() const
{
return comm_;
}
//- Return message tag used for sending
virtual int tag() const
{
return tag_;
}
// Agglomeration
//- Agglomerating the given fine-level coefficients and return

View file

@ -54,14 +54,17 @@ Foam::mixingPlaneAMGInterface::mixingPlaneAMGInterface
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
)
:
AMGInterface(lduMesh),
fineMixingPlaneInterface_
(
refCast<const mixingPlaneLduInterface>(fineInterface)
)
),
comm_(coarseComm),
tag_(refCast<const mixingPlaneLduInterface>(fineInterface).tag())
{}

View file

@ -65,6 +65,15 @@ class mixingPlaneAMGInterface
const mixingPlaneLduInterface& fineMixingPlaneInterface_;
// Parallel communication
//- Communicator to use for parallel communication
const label comm_;
//- Message tag used for sending
const int tag_;
// Private Member Functions
//- Disallow default bitwise copy construct
@ -96,12 +105,12 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);
// Destructor
// -Destructor
virtual ~mixingPlaneAMGInterface();
@ -116,6 +125,21 @@ public:
}
// Communications support
//- Return communicator used for parallel communication
virtual int comm() const
{
return comm_;
}
//- Return message tag used for sending
virtual int tag() const
{
return tag_;
}
// Agglomeration
//- Agglomerating the given fine-level coefficients and return

View file

@ -48,11 +48,14 @@ Foam::processorAMGInterface::processorAMGInterface
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
)
:
AMGInterface(lduMesh),
fineProcInterface_(refCast<const processorLduInterface>(fineInterface))
fineProcInterface_(refCast<const processorLduInterface>(fineInterface)),
comm_(coarseComm),
tag_(refCast<const processorLduInterface>(fineInterface).tag())
{
// Make a lookup table of entries for owner/neighbour
HashTable<SLList<label>, label, Hash<label> > neighboursTable

View file

@ -59,6 +59,12 @@ class processorAMGInterface
// agglomerated
const processorLduInterface& fineProcInterface_;
//- Communicator to use for parallel communication
const label comm_;
//- Message tag used for sending
const int tag_;
// Private Member Functions
@ -84,12 +90,12 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);
// Destructor
//- Destructor
virtual ~processorAMGInterface();
@ -104,6 +110,21 @@ public:
}
// Communications support
//- Return communicator used for parallel communication
virtual int comm() const
{
return comm_;
}
//- Return message tag used for sending
virtual int tag() const
{
return tag_;
}
// Interface transfer functions
//- Initialise interface data transfer

View file

@ -48,7 +48,8 @@ Foam::regionCoupleAMGInterface::regionCoupleAMGInterface
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
)
:
ggiAMGInterface
@ -57,7 +58,8 @@ Foam::regionCoupleAMGInterface::regionCoupleAMGInterface
coarseInterfaces,
fineInterface,
localRestrictAddressing,
neighbourRestrictAddressing
neighbourRestrictAddressing,
coarseComm
),
coupled_(fineInterface.coupled())
{}

View file

@ -76,12 +76,12 @@ public:
const lduInterfacePtrsList& coarseInterfaces,
const lduInterface& fineInterface,
const labelField& localRestrictAddressing,
const labelField& neighbourRestrictAddressing
const labelField& neighbourRestrictAddressing,
const label coarseComm
);
// Destructor
//- Destructor
virtual ~regionCoupleAMGInterface();

View file

@ -59,8 +59,7 @@ public:
// Constructors
// Destructor
//- Destructor
virtual ~lduMesh()
{}
@ -78,6 +77,10 @@ public:
//- Return a list of pointers for each patch
// with only those pointing to interfaces being set
virtual lduInterfacePtrsList interfaces() const = 0;
//- Return communicator used for parallel communication
virtual int comm() const = 0;
};

View file

@ -69,6 +69,9 @@ class lduPrimitiveMesh
// Note this does must be held as a copy. HJ, 20/Feb/2009
lduSchedule patchSchedule_;
//- Communicator to use for parallel communication
const label comm_;
// Private Member Functions
@ -89,12 +92,14 @@ public:
(
const label nCells,
const unallocLabelList& l,
const unallocLabelList& u
const unallocLabelList& u,
const label comm
)
:
lduAddressing(nCells),
lowerAddr_(l),
upperAddr_(u)
upperAddr_(u),
comm_(comm)
{}
@ -104,12 +109,14 @@ public:
const label nCells,
labelList& l,
labelList& u,
const label comm,
bool reUse
)
:
lduAddressing(nCells),
lowerAddr_(l, reUse),
upperAddr_(u, reUse)
upperAddr_(u, reUse),
comm_(comm)
{}
@ -194,6 +201,12 @@ public:
{
return patchSchedule_;
}
//- Return communicator used for parallel communication
virtual label comm() const
{
return comm_;
}
};

View file

@ -111,8 +111,7 @@ public:
);
// Destructor
//- Destructor
virtual ~ggiPointPatch();
@ -122,6 +121,21 @@ public:
//- Return true because this patch is coupled
virtual bool coupled() const;
// Communications support
//- Return communicator used for communication
virtual label comm() const
{
return ggiPolyPatch_.comm();
}
//- Return message tag to use for communication
virtual int tag() const
{
return ggiPolyPatch_.tag();
}
};

View file

@ -124,8 +124,7 @@ public:
);
// Destructor
//- Destructor
virtual ~processorPointPatch();
@ -175,6 +174,21 @@ public:
}
// Communications support
//- Return communicator used for communication
virtual label comm() const
{
return procPolyPatch_.comm();
}
//- Return message tag to use for communication
virtual int tag() const
{
return procPolyPatch_.tag();
}
// Access functions for demand driven data
//- Return mesh points

View file

@ -33,7 +33,8 @@ License
Foam::List<Foam::labelPair> Foam::mapDistribute::schedule
(
const labelListList& subMap,
const labelListList& constructMap
const labelListList& constructMap,
const int tag
)
{
// Communications: send and receive processor
@ -74,7 +75,7 @@ Foam::List<Foam::labelPair> Foam::mapDistribute::schedule
slave++
)
{
IPstream fromSlave(Pstream::scheduled, slave);
IPstream fromSlave(Pstream::scheduled, slave, 0, tag);
List<labelPair> nbrData(fromSlave);
forAll (nbrData, i)
@ -95,18 +96,24 @@ Foam::List<Foam::labelPair> Foam::mapDistribute::schedule
slave++
)
{
OPstream toSlave(Pstream::scheduled, slave);
OPstream toSlave(Pstream::scheduled, slave, 0, tag);
toSlave << allComms;
}
}
else
{
{
OPstream toMaster(Pstream::scheduled, Pstream::masterNo());
OPstream toMaster(Pstream::scheduled, Pstream::masterNo(), 0, tag);
toMaster << allComms;
}
{
IPstream fromMaster(Pstream::scheduled, Pstream::masterNo());
IPstream fromMaster
(
Pstream::scheduled,
Pstream::masterNo(),
0,
tag
);
fromMaster >> allComms;
}
}
@ -156,7 +163,7 @@ const Foam::List<Foam::labelPair>& Foam::mapDistribute::schedule() const
(
new List<labelPair>
(
schedule(subMap_, constructMap_)
schedule(subMap_, constructMap_, Pstream::msgType())
)
);
}
@ -311,7 +318,7 @@ Foam::mapDistribute::mapDistribute(const mapDistribute& map)
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::mapDistribute::compact(const boolList& elemIsUsed)
void Foam::mapDistribute::compact(const boolList& elemIsUsed, const int tag)
{
// 1. send back to sender. Have him delete the corresponding element
// from the submap and do the same to the constructMap locally
@ -319,7 +326,12 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
// Send elemIsUsed field to neighbour. Use nonblocking code from
// mapDistribute but in reverse order.
if (Pstream::parRun())
{
label startOfRequests = Pstream::nRequests();
// Set up receives from neighbours
List<boolList> sendFields(Pstream::nProcs());
for (label domain = 0; domain < Pstream::nProcs(); domain++)
@ -340,7 +352,8 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>(subField.begin()),
subField.size()*sizeof(bool)
subField.size()*sizeof(bool),
tag
);
}
}
@ -361,7 +374,8 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].size()*sizeof(bool)
recvFields[domain].size()*sizeof(bool),
tag
);
}
}
@ -382,8 +396,7 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
// Wait for all to finish
OPstream::waitRequests();
IPstream::waitRequests();
Pstream::waitRequests(startOfRequests);
// Compact out all submap entries that are referring to unused elements

View file

@ -166,7 +166,8 @@ public:
static List<labelPair> schedule
(
const labelListList& subMap,
const labelListList& constructMap
const labelListList& constructMap,
const int tag
);
//- Return a schedule. Demand driven. See above.
@ -178,8 +179,11 @@ public:
//- Compact maps. Gets per field a bool whether it is used locally
// and works out itself what this side and sender side can remove
// from maps.
void compact(const boolList& elemIsUsed);
void compact
(
const boolList& elemIsUsed,
const int tag = Pstream::msgType()
);
//- Distribute data. Note:schedule only used for Pstream::scheduled
// for now, all others just use send-to-all, receive-from-all.
@ -191,7 +195,8 @@ public:
const label constructSize,
const labelListList& subMap,
const labelListList& constructMap,
List<T>&
List<T>&,
const int tag = Pstream::msgType()
);
//- Distribute data. If multiple processors writing to same
@ -206,12 +211,17 @@ public:
const labelListList& constructMap,
List<T>&,
const CombineOp& cop,
const T& nullValue
const T& nullValue,
const int tag = Pstream::msgType()
);
//- Distribute data using default commsType.
template<class T>
void distribute(List<T>& fld) const
void distribute
(
List<T>& fld,
const int tag = Pstream::msgType()
) const
{
if
(

View file

@ -36,9 +36,35 @@ void Foam::mapDistribute::distribute
const label constructSize,
const labelListList& subMap,
const labelListList& constructMap,
List<T>& field
List<T>& field,
const int tag
)
{
if (!Pstream::parRun())
{
// Do only me to me.
const labelList& mySubMap = subMap[Pstream::myProcNo()];
List<T> subField(mySubMap.size());
forAll(mySubMap, i)
{
subField[i] = field[mySubMap[i]];
}
// Receive sub field from myself (subField)
const labelList& map = constructMap[Pstream::myProcNo()];
field.setSize(constructSize);
forAll(map, i)
{
field[map[i]] = subField[i];
}
return;
}
if (commsType == Pstream::blocking)
{
// Since buffered sending can reuse the field to collect the
@ -51,7 +77,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
OPstream toNbr(Pstream::blocking, domain);
OPstream toNbr(Pstream::blocking, domain, 0, tag);
toNbr << UIndirectList<T>(field, map);
}
}
@ -82,7 +108,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
IPstream fromNbr(Pstream::blocking, domain);
IPstream fromNbr(Pstream::blocking, domain, 0, tag);
List<T> subField(fromNbr);
checkReceivedSize(domain, map.size(), subField.size());
@ -116,22 +142,24 @@ void Foam::mapDistribute::distribute
forAll(schedule, i)
{
const labelPair& twoProcs = schedule[i];
// twoProcs is a swap pair of processors. The first one is the
// one that needs to send first and then receive.
label sendProc = twoProcs[0];
label recvProc = twoProcs[1];
if (Pstream::myProcNo() == sendProc)
{
// I am sender. Send to recvProc.
OPstream toNbr(Pstream::scheduled, recvProc);
// I am send first, receive next
{
OPstream toNbr(Pstream::scheduled, recvProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[recvProc]);
}
else
{
// I am receiver. Receive from sendProc.
IPstream fromNbr(Pstream::scheduled, sendProc);
IPstream fromNbr(Pstream::scheduled, recvProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[sendProc];
const labelList& map = constructMap[recvProc];
checkReceivedSize(recvProc, map.size(), subField.size());
@ -141,29 +169,97 @@ void Foam::mapDistribute::distribute
}
}
}
else
{
// I am receive first, send next
{
IPstream fromNbr(Pstream::scheduled, sendProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[sendProc];
checkReceivedSize(sendProc, map.size(), subField.size());
forAll(map, i)
{
newField[map[i]] = subField[i];
}
}
{
OPstream toNbr(Pstream::scheduled, sendProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[sendProc]);
}
}
}
field.transfer(newField);
}
else if (commsType == Pstream::nonBlocking)
{
label nOutstanding = Pstream::nRequests();
if (!contiguous<T>())
{
FatalErrorIn
(
"template<class T>\n"
"void mapDistribute::distribute\n"
"(\n"
" const Pstream::commsTypes commsType,\n"
" const List<labelPair>& schedule,\n"
" const label constructSize,\n"
" const labelListList& subMap,\n"
" const labelListList& constructMap,\n"
" List<T>& field\n"
")\n"
) << "Non-blocking only supported for contiguous data."
<< exit(FatalError);
// Stream data into buffer
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
// Put data into send buffer
OPstream toDomain(Pstream::nonBlocking, domain, 0, tag);
toDomain << UIndirectList<T>(field, map);
}
}
// Start receiving. Do not block.
{
// Set up 'send' to myself
const labelList& mySubMap = subMap[Pstream::myProcNo()];
List<T> mySubField(mySubMap.size());
forAll(mySubMap, i)
{
mySubField[i] = field[mySubMap[i]];
}
// Combine bits. Note that can reuse field storage
field.setSize(constructSize);
// Receive sub field from myself
{
const labelList& map = constructMap[Pstream::myProcNo()];
forAll(map, i)
{
field[map[i]] = mySubField[i];
}
}
}
// Block ourselves, waiting only for the current comms
Pstream::waitRequests(nOutstanding);
// Consume
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = constructMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
IPstream str(Pstream::nonBlocking, domain, 0, tag);
List<T> recvField(str);
checkReceivedSize(domain, map.size(), recvField.size());
forAll(map, i)
{
field[map[i]] = recvField[i];
}
}
}
}
else
{
// Set up sends to neighbours
List<List<T > > sendFields(Pstream::nProcs());
@ -175,9 +271,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
List<T>& subField = sendFields[domain];
subField.setSize(map.size());
forAll(map, i)
{
subField[i] = field[map[i]];
@ -188,7 +282,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>(subField.begin()),
subField.size()*sizeof(T)
subField.byteSize(),
tag
);
}
}
@ -209,7 +304,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].size()*sizeof(T)
recvFields[domain].byteSize(),
tag
);
}
}
@ -248,8 +344,8 @@ void Foam::mapDistribute::distribute
// Wait for all to finish
OPstream::waitRequests();
IPstream::waitRequests();
Pstream::waitRequests(nOutstanding);
// Collect neighbour fields
@ -259,16 +355,14 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
checkReceivedSize
(
domain,
map.size(),
recvFields[domain].size()
);
const List<T>& subField = recvFields[domain];
checkReceivedSize(domain, map.size(), subField.size());
forAll(map, i)
{
field[map[i]] = recvFields[domain][i];
field[map[i]] = subField[i];
}
}
}
}
@ -294,9 +388,35 @@ void Foam::mapDistribute::distribute
const labelListList& constructMap,
List<T>& field,
const CombineOp& cop,
const T& nullValue
const T& nullValue,
const int tag
)
{
if (!Pstream::parRun())
{
// Do only me to me.
const labelList& mySubMap = subMap[Pstream::myProcNo()];
List<T> subField(mySubMap.size());
forAll(mySubMap, i)
{
subField[i] = field[mySubMap[i]];
}
// Receive sub field from myself (subField)
const labelList& map = constructMap[Pstream::myProcNo()];
field.setSize(constructSize);
field = nullValue;
forAll(map, i)
{
cop(field[map[i]], subField[i]);
}
return;
}
if (commsType == Pstream::blocking)
{
// Since buffered sending can reuse the field to collect the
@ -309,7 +429,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
OPstream toNbr(Pstream::blocking, domain);
OPstream toNbr(Pstream::blocking, domain, 0, tag);
toNbr << UIndirectList<T>(field, map);
}
}
@ -341,28 +461,10 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
IPstream fromNbr(Pstream::blocking, domain);
IPstream fromNbr(Pstream::blocking, domain, 0, tag);
List<T> subField(fromNbr);
if (subField.size() != map.size())
{
FatalErrorIn
(
"template<class T>\n"
"void mapDistribute::distribute\n"
"(\n"
" const Pstream::commsTypes commsType,\n"
" const List<labelPair>& schedule,\n"
" const label constructSize,\n"
" const labelListList& subMap,\n"
" const labelListList& constructMap,\n"
" List<T>& field\n"
")\n"
) << "Expected from processor " << domain
<< " " << map.size() << " but received "
<< subField.size() << " elements."
<< abort(FatalError);
}
checkReceivedSize(domain, map.size(), subField.size());
forAll(map, i)
{
@ -393,42 +495,25 @@ void Foam::mapDistribute::distribute
forAll(schedule, i)
{
const labelPair& twoProcs = schedule[i];
// twoProcs is a swap pair of processors. The first one is the
// one that needs to send first and then receive.
label sendProc = twoProcs[0];
label recvProc = twoProcs[1];
if (Pstream::myProcNo() == sendProc)
{
// I am sender. Send to recvProc.
OPstream toNbr(Pstream::scheduled, recvProc);
// I am send first, receive next
{
OPstream toNbr(Pstream::scheduled, recvProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[recvProc]);
}
else
{
// I am receiver. Receive from sendProc.
IPstream fromNbr(Pstream::scheduled, sendProc);
IPstream fromNbr(Pstream::scheduled, recvProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[recvProc];
const labelList& map = constructMap[sendProc];
if (subField.size() != map.size())
{
FatalErrorIn
(
"template<class T>\n"
"void mapDistribute::distribute\n"
"(\n"
" const Pstream::commsTypes commsType,\n"
" const List<labelPair>& schedule,\n"
" const label constructSize,\n"
" const labelListList& subMap,\n"
" const labelListList& constructMap,\n"
" List<T>& field\n"
")\n"
) << "Expected from processor " << sendProc
<< " " << map.size() << " but received "
<< subField.size() << " elements."
<< abort(FatalError);
}
checkReceivedSize(recvProc, map.size(), subField.size());
forAll(map, i)
{
@ -436,28 +521,92 @@ void Foam::mapDistribute::distribute
}
}
}
else
{
// I am receive first, send next
{
IPstream fromNbr(Pstream::scheduled, sendProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[sendProc];
checkReceivedSize(sendProc, map.size(), subField.size());
forAll(map, i)
{
cop(newField[map[i]], subField[i]);
}
}
{
OPstream toNbr(Pstream::scheduled, sendProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[sendProc]);
}
}
}
field.transfer(newField);
}
else if (commsType == Pstream::nonBlocking)
{
label nOutstanding = Pstream::nRequests();
if (!contiguous<T>())
{
FatalErrorIn
(
"template<class T>\n"
"void mapDistribute::distribute\n"
"(\n"
" const Pstream::commsTypes commsType,\n"
" const List<labelPair>& schedule,\n"
" const label constructSize,\n"
" const labelListList& subMap,\n"
" const labelListList& constructMap,\n"
" List<T>& field\n"
")\n"
) << "Non-blocking only supported for contiguous data."
<< exit(FatalError);
// Stream data into buffer
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
// Put data into send buffer
OPstream toDomain(Pstream::nonBlocking, domain, 0, tag);
toDomain << UIndirectList<T>(field, map);
}
}
// Start receiving. Do not block.
{
// Set up 'send' to myself
List<T> mySubField(field, subMap[Pstream::myProcNo()]);
// Combine bits. Note that can reuse field storage
field.setSize(constructSize);
field = nullValue;
// Receive sub field from myself
{
const labelList& map = constructMap[Pstream::myProcNo()];
forAll(map, i)
{
cop(field[map[i]], mySubField[i]);
}
}
}
// Block ourselves, waiting only for the current comms
Pstream::waitRequests(nOutstanding);
// Consume
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = constructMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
IPstream str(Pstream::nonBlocking, domain, 0, tag);
List<T> recvField(str);
checkReceivedSize(domain, map.size(), recvField.size());
forAll(map, i)
{
cop(field[map[i]], recvField[i]);
}
}
}
}
else
{
// Set up sends to neighbours
List<List<T> > sendFields(Pstream::nProcs());
@ -480,7 +629,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>(subField.begin()),
subField.size()*sizeof(T)
subField.size()*sizeof(T),
tag
);
}
}
@ -501,7 +651,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].size()*sizeof(T)
recvFields[domain].size()*sizeof(T),
tag
);
}
}
@ -538,9 +689,8 @@ void Foam::mapDistribute::distribute
// Wait for all to finish
Pstream::waitRequests(nOutstanding);
OPstream::waitRequests();
IPstream::waitRequests();
// Collect neighbour fields
@ -550,38 +700,22 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
if (recvFields[domain].size() != map.size())
{
FatalErrorIn
(
"template<class T>\n"
"void mapDistribute::distribute\n"
"(\n"
" const Pstream::commsTypes commsType,\n"
" const List<labelPair>& schedule,\n"
" const label constructSize,\n"
" const labelListList& subMap,\n"
" const labelListList& constructMap,\n"
" List<T>& field\n"
")\n"
) << "Expected from processor " << domain
<< " " << map.size() << " but received "
<< recvFields[domain].size() << " elements."
<< abort(FatalError);
}
const List<T>& subField = recvFields[domain];
checkReceivedSize(domain, map.size(), subField.size());
forAll(map, i)
{
cop(field[map[i]], recvFields[domain][i]);
cop(field[map[i]], subField[i]);
}
}
}
}
}
else
{
// This needs to be cleaned up: temporary solution. HJ, 15/Jun/2014
FatalErrorIn("mapDistribute::distribute(..)")
<< "Unknown communication schedule " << label(commsType)
<< "Unknown communication schedule " << commsType
<< abort(FatalError);
}
}

View file

@ -244,6 +244,7 @@ Foam::polyMesh::polyMesh(const IOobject& io)
bounds_(allPoints_),
geometricD_(Vector<label>::zero),
solutionD_(Vector<label>::zero),
comm_(Pstream::worldComm),
pointZones_
(
IOobject
@ -441,6 +442,7 @@ Foam::polyMesh::polyMesh
bounds_(allPoints_, syncPar),
geometricD_(Vector<label>::zero),
solutionD_(Vector<label>::zero),
comm_(Pstream::worldComm),
pointZones_
(
IOobject
@ -601,6 +603,7 @@ Foam::polyMesh::polyMesh
bounds_(allPoints_, syncPar),
geometricD_(Vector<label>::zero),
solutionD_(Vector<label>::zero),
comm_(Pstream::worldComm),
pointZones_
(
IOobject
@ -925,6 +928,18 @@ Foam::label Foam::polyMesh::nSolutionD() const
}
Foam::label Foam::polyMesh::comm() const
{
return comm_;
}
Foam::label& Foam::polyMesh::comm()
{
return comm_;
}
// Add boundary patches. Constructor helper
void Foam::polyMesh::addPatches
(

View file

@ -134,6 +134,9 @@ private:
// Defined according to the presence of empty patches
mutable Vector<label> solutionD_;
//- Communicator used for parallel communication
label comm_;
// Zoning information
@ -293,8 +296,7 @@ public:
);
// Destructor
//- Destructor
virtual ~polyMesh();
@ -386,6 +388,18 @@ public:
//- Return the number of valid solved-for dimensions in the mesh
label nSolutionD() const;
// Communication support
//- Return communicator used for parallel communication
label comm() const;
//- Return communicator used for parallel communication
label& comm();
// Point, face and cell zones
//- Return point zone mesh
const pointZoneMesh& pointZones() const
{
@ -404,6 +418,7 @@ public:
return cellZones_;
}
//- Return parallel info
const globalMeshData& globalData() const;

View file

@ -31,6 +31,7 @@ Contributor
#include "ggiPolyPatch.H"
#include "polyBoundaryMesh.H"
#include "polyMesh.H"
#include "addToRunTimeSelectionTable.H"
#include "demandDrivenData.H"
#include "polyPatchID.H"
@ -65,7 +66,7 @@ bool Foam::ggiPolyPatch::active() const
// For decomposition and reconstruction
// If not runing in parallel and the patch is not local, this is a serial
// operation on a piece of a parallel decomposition and is therefore
// inactive. HJ, 5/Spe/2016
// inactive. HJ, 5/Sep/2016
if (!Pstream::parRun() && !localParallel())
{
return false;
@ -770,6 +771,18 @@ const Foam::faceZone& Foam::ggiPolyPatch::zone() const
}
Foam::label Foam::ggiPolyPatch::comm() const
{
return boundaryMesh().mesh().comm();
}
int Foam::ggiPolyPatch::tag() const
{
return Pstream::msgType();
}
const Foam::labelList& Foam::ggiPolyPatch::zoneAddressing() const
{
if (!zoneAddressingPtr_)

View file

@ -303,6 +303,15 @@ public:
const faceZone& zone() const;
// Communications support
//- Return communicator used for communication
virtual label comm() const;
//- Return message tag to use for communication
virtual int tag() const;
// Interpolation data
//- Is this the master side?

View file

@ -805,6 +805,18 @@ Foam::mixingPlanePolyPatch::patchToPatch() const
}
Foam::label Foam::mixingPlanePolyPatch::comm() const
{
return boundaryMesh().mesh().comm();
}
int Foam::mixingPlanePolyPatch::tag() const
{
return Pstream::msgType();
}
const Foam::vectorField&
Foam::mixingPlanePolyPatch::reconFaceCellCentres() const
{

View file

@ -278,8 +278,7 @@ public:
}
// Destructor
//- Destructor
virtual ~mixingPlanePolyPatch();
@ -346,6 +345,15 @@ public:
const mixingPlaneZoneInterpolation& patchToPatch() const;
// Communications support
//- Return communicator used for communication
virtual label comm() const;
//- Return message tag to use for communication
virtual int tag() const;
// Interpolation functions
//- Interpolate face field to profile: given field on a

View file

@ -27,12 +27,13 @@ License
#include "addToRunTimeSelectionTable.H"
#include "dictionary.H"
#include "SubField.H"
#include "demandDrivenData.H"
#include "matchPoints.H"
#include "OFstream.H"
#include "polyBoundaryMesh.H"
#include "polyMesh.H"
#include "foamTime.H"
#include "transformList.H"
#include "demandDrivenData.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -134,6 +135,18 @@ Foam::processorPolyPatch::~processorPolyPatch()
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::processorPolyPatch::comm() const
{
return boundaryMesh().mesh().comm();
}
int Foam::processorPolyPatch::tag() const
{
return Pstream::msgType();
}
void Foam::processorPolyPatch::initAddressing()
{
polyPatch::initAddressing();

View file

@ -43,6 +43,7 @@ SourceFiles
#define processorPolyPatch_H
#include "coupledPolyPatch.H"
#include "polyBoundaryMesh.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -189,8 +190,7 @@ public:
}
// Destructor
//- Destructor
virtual ~processorPolyPatch();
@ -208,6 +208,15 @@ public:
return neighbProcNo_;
}
// Communications support
//- Return communicator used for communication
virtual label comm() const;
//- Return message tag to use for communication
virtual int tag() const;
//- Does the processor own the patch ?
bool owner() const
{

View file

@ -921,6 +921,18 @@ void Foam::regionCouplePolyPatch::detach() const
}
Foam::label Foam::regionCouplePolyPatch::comm() const
{
return boundaryMesh().mesh().comm();
}
int Foam::regionCouplePolyPatch::tag() const
{
return Pstream::msgType();
}
const Foam::labelList& Foam::regionCouplePolyPatch::zoneAddressing() const
{
if (!zoneAddressingPtr_)

View file

@ -351,6 +351,15 @@ public:
void detach() const;
// Communications support
//- Return communicator used for communication
virtual label comm() const;
//- Return message tag to use for communication
virtual int tag() const;
// Interpolation data
//- Is this the master side?

View file

@ -651,7 +651,8 @@ Foam::autoPtr<Foam::amgMatrix> Foam::pamgPolicy::restrictMatrix
coarseInterfaces,
fineInterface,
fineInterface.interfaceInternalField(child_),
fineInterfaceAddr[intI]
fineInterfaceAddr[intI],
Pstream::worldComm //HJ, AMG Comm fineMesh.comm()
).ptr()
);
}

View file

@ -61,6 +61,7 @@ void tetPolyMesh::clearOut() const
clearOutParPointData();
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
// Construct from components
@ -71,6 +72,7 @@ tetPolyMesh::tetPolyMesh(const polyMesh& pMesh)
boundary_(*this, pMesh.boundaryMesh()),
faceOffset_(mesh_.nPoints()),
cellOffset_(faceOffset_ + mesh_.nFaces()),
comm_(Pstream::worldComm),
nPoints_(-1),
nEdges_(-1),
nTets_(-1),
@ -105,6 +107,18 @@ tetPolyMesh::~tetPolyMesh()
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::tetPolyMesh::comm() const
{
return comm_;
}
Foam::label& Foam::tetPolyMesh::comm()
{
return comm_;
}
// Return number of edges in decomposition for a face
label tetPolyMesh::nEdgesForFace(const label faceID) const
{

View file

@ -138,6 +138,9 @@ class tetPolyMesh
//- Offset in numbering to first cell centre
label cellOffset_;
//- Communicator used for parallel communication
label comm_;
// Demand-driven data
@ -205,9 +208,9 @@ public:
//- Construct from components
explicit tetPolyMesh(const polyMesh& pMesh);
// Destructor
~tetPolyMesh();
//- Destructor
virtual ~tetPolyMesh();
// Member Functions
@ -370,6 +373,15 @@ public:
const edgeList& parallelEdges() const;
// Communication support
//- Return communicator used for parallel communication
label comm() const;
//- Return communicator used for parallel communication
label& comm();
// Edit
//- Update mesh topology using the morph engine